You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by cs...@apache.org on 2022/04/28 10:40:10 UTC

[maven-indexer] 01/01: [MINDEXER-151] Proof of concept

This is an automated email from the ASF dual-hosted git repository.

cstamas pushed a commit to branch MINDEXER-151-poc
in repository https://gitbox.apache.org/repos/asf/maven-indexer.git

commit d6e8a38821635ad16a7534b4f4917218be3e2096
Author: Tamas Cservenak <ta...@cservenak.net>
AuthorDate: Thu Apr 28 12:38:49 2022 +0200

    [MINDEXER-151] Proof of concept
    
    This change (ingesting GZIP files raw records into Lucene
    index on multiple threads) on my PC halves the execution time:
    while BasicUsageExample on master takes over 15 minutes to
    finish (when doing full update), this PR makes it under
    7 minutes.
---
 .../maven/index/updater/IndexDataReader.java       | 144 ++++++++++++++++-----
 indexer-examples/indexer-examples-basic/pom.xml    |   2 +-
 .../maven/index/examples/BasicUsageExample.java    |   8 +-
 3 files changed, 122 insertions(+), 32 deletions(-)

diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
index b1c4237..e4beae9 100644
--- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
+++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
@@ -26,9 +26,17 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UTFDataFormatException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.LinkedHashSet;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.lucene.document.Document;
@@ -39,6 +47,8 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.maven.index.ArtifactInfo;
 import org.apache.maven.index.context.IndexUtils;
 import org.apache.maven.index.context.IndexingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An index data reader used to parse transfer index format.
@@ -47,10 +57,12 @@ import org.apache.maven.index.context.IndexingContext;
  */
 public class IndexDataReader
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class );
+
     private final DataInputStream dis;
 
     public IndexDataReader( final InputStream is )
-        throws IOException
+            throws IOException
     {
         // MINDEXER-13
         // LightweightHttpWagon may have performed automatic decompression
@@ -72,8 +84,11 @@ public class IndexDataReader
     }
 
     public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
-        throws IOException
+            throws IOException
     {
+        LOGGER.info( "Reading index..." );
+        Instant start = Instant.now();
+
         long timestamp = readHeader();
 
         Date date = null;
@@ -87,45 +102,114 @@ public class IndexDataReader
 
         int n = 0;
 
-        Document doc;
-        Set<String> rootGroups = new LinkedHashSet<>();
-        Set<String> allGroups = new LinkedHashSet<>();
+        final Document END = new Document();
 
-        while ( ( doc = readDocument() ) != null )
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
+        ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
+        int threads = Runtime.getRuntime().availableProcessors() / 2;
+        ExecutorService executorService = Executors.newFixedThreadPool( threads );
+        ArrayList<Exception> errors = new ArrayList<>();
+
+        for ( int i = 0; i < threads; i++ )
         {
-            ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
-            if ( ai != null )
-            {
-                w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+            executorService.execute( () -> {
+                LOGGER.info( "Starting thread {}", Thread.currentThread().getName() );
+                try
+                {
+                    while ( true )
+                    {
+                        try
+                        {
+                            Document doc = queue.take();
+                            if ( doc == END )
+                            {
+                                break;
+                            }
+                            addToIndex( doc, context, w, rootGroups, allGroups );
+                        }
+                        catch ( InterruptedException | IOException e )
+                        {
+                            errors.add( e );
+                            break;
+                        }
+                    }
+                }
+                finally
+                {
+                    LOGGER.info( "Done thread {}", Thread.currentThread().getName() );
+                }
+            } );
+        }
 
-                rootGroups.add( ai.getRootGroup() );
-                allGroups.add( ai.getGroupId() );
-            }
-            else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
-                    || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+        try
+        {
+            Document doc;
+            while ( ( doc = readDocument() ) != null )
             {
-                // skip it
+                queue.put( doc );
+                n++;
             }
-            else
+            LOGGER.info( "Signalling END" );
+            for ( int i = 0; i < threads; i++ )
             {
-                w.addDocument( doc );
+                queue.put( END );
             }
-            n++;
+
+            LOGGER.info( "Shutting down threads" );
+            executorService.shutdown();
+            executorService.awaitTermination( 5L, TimeUnit.MINUTES );
+        }
+        catch ( InterruptedException e )
+        {
+            throw new IOException( "Interrupted", e );
         }
 
+        if ( !errors.isEmpty() )
+        {
+            IOException exception = new IOException( "Error during load of index" );
+            errors.forEach( exception::addSuppressed );
+            throw exception;
+        }
+
+        LOGGER.info( "Commit..." );
         w.commit();
 
         IndexDataReadResult result = new IndexDataReadResult();
         result.setDocumentCount( n );
         result.setTimestamp( date );
-        result.setRootGroups( rootGroups );
-        result.setAllGroups( allGroups );
+        result.setRootGroups( rootGroups.keySet() );
+        result.setAllGroups( allGroups.keySet() );
 
+        LOGGER.info( "Reading index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
         return result;
     }
 
+    private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter,
+                             final ConcurrentMap<String, Boolean> rootGroups,
+                             final ConcurrentMap<String, Boolean> allGroups )
+            throws IOException
+    {
+        ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
+        if ( ai != null )
+        {
+            indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+
+            rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE );
+            allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE );
+        }
+        else
+        {
+            if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null
+                    && doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+            {
+                indexWriter.addDocument( doc );
+            }
+        }
+    }
+
     public long readHeader()
-        throws IOException
+            throws IOException
     {
         final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 );
 
@@ -139,7 +223,7 @@ public class IndexDataReader
     }
 
     public Document readDocument()
-        throws IOException
+            throws IOException
     {
         int fieldCount;
         try
@@ -160,7 +244,7 @@ public class IndexDataReader
 
         // Fix up UINFO field wrt MINDEXER-41
         final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO );
-        final String info =  doc.get( ArtifactInfo.INFO );
+        final String info = doc.get( ArtifactInfo.INFO );
         if ( uinfoField != null && info != null && !info.isEmpty() )
         {
             final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info );
@@ -179,7 +263,7 @@ public class IndexDataReader
     }
 
     private Field readField()
-        throws IOException
+            throws IOException
     {
         int flags = dis.read();
 
@@ -199,7 +283,7 @@ public class IndexDataReader
     }
 
     private static String readUTF( DataInput in )
-        throws IOException
+            throws IOException
     {
         int utflen = in.readInt();
 
@@ -214,7 +298,7 @@ public class IndexDataReader
         catch ( OutOfMemoryError e )
         {
             throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
-                + " See MINDEXER-28 for more information!", e );
+                    + " See MINDEXER-28 for more information!", e );
         }
 
         int c, char2, char3;
@@ -282,7 +366,7 @@ public class IndexDataReader
                         throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) );
                     }
                     chararr[chararrCount++] =
-                        (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
+                            (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
                     break;
 
                 default:
@@ -360,7 +444,7 @@ public class IndexDataReader
      * @throws IOException in case of an IO exception during index file access
      */
     public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context )
-        throws IOException
+            throws IOException
     {
         dis.readByte(); // data format version
 
diff --git a/indexer-examples/indexer-examples-basic/pom.xml b/indexer-examples/indexer-examples-basic/pom.xml
index 1881758..7b45ed1 100644
--- a/indexer-examples/indexer-examples-basic/pom.xml
+++ b/indexer-examples/indexer-examples-basic/pom.xml
@@ -58,7 +58,7 @@ under the License.
       <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
-        <scope>test</scope>
+        <scope>runtime</scope>
       </dependency>
 
       <dependency>
diff --git a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java
index 7d79508..722947d 100644
--- a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java
+++ b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java
@@ -68,6 +68,8 @@ import org.eclipse.aether.version.Version;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -142,14 +144,17 @@ public class BasicUsageExample
         // Preferred frequency is once a week.
         if ( true )
         {
+            Instant updateStart = Instant.now();
             System.out.println( "Updating Index..." );
             System.out.println( "This might take a while on first run, so please be patient!" );
             // Create ResourceFetcher implementation to be used with IndexUpdateRequest
             // Here, we use Wagon based one as shorthand, but all we need is a ResourceFetcher implementation
             TransferListener listener = new AbstractTransferListener()
             {
+                Instant start;
                 public void transferStarted( TransferEvent transferEvent )
                 {
+                    start = Instant.now();
                     System.out.print( "  Downloading " + transferEvent.getResource().getName() );
                 }
 
@@ -159,7 +164,7 @@ public class BasicUsageExample
 
                 public void transferCompleted( TransferEvent transferEvent )
                 {
-                    System.out.println( " - Done" );
+                    System.out.println( " - Done in " + Duration.between( start, Instant.now() ).getSeconds() + " sec" );
                 }
             };
             ResourceFetcher resourceFetcher = new WagonHelper.WagonFetcher( httpWagon, listener, null, null );
@@ -182,6 +187,7 @@ public class BasicUsageExample
                         + updateResult.getTimestamp() + " period." );
             }
 
+            System.out.println( "Finished in " + Duration.between( updateStart, Instant.now() ).getSeconds() + " sec" );
             System.out.println();
         }