You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@maven.apache.org by GitBox <gi...@apache.org> on 2022/10/21 12:02:00 UTC

[GitHub] [maven-indexer] cstamas opened a new pull request, #255: [MINDEXER-151] Proof of concept

cstamas opened a new pull request, #255:
URL: https://github.com/apache/maven-indexer/pull/255

   This change (ingesting GZIP files raw records into Lucene index on multiple threads) on my PC halves the execution time: while BasicUsageExample on master takes over 15 minutes to finish (when doing full update), this PR makes it under 7 minutes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@maven.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [maven-indexer] mbien commented on a diff in pull request #255: [MINDEXER-151] Speed up Index update from remote

Posted by GitBox <gi...@apache.org>.
mbien commented on code in PR #255:
URL: https://github.com/apache/maven-indexer/pull/255#discussion_r1028796299


##########
indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java:
##########
@@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
         int n = 0;
 
         Document doc;
-        Set<String> rootGroups = new LinkedHashSet<>();
-        Set<String> allGroups = new LinkedHashSet<>();
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
 
         while ( ( doc = readDocument() ) != null )
         {
-            ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
-            if ( ai != null )
+            addToIndex( doc, context, w, rootGroups, allGroups );
+            n++;
+        }
+
+        w.commit();
+
+        IndexDataReadResult result = new IndexDataReadResult();
+        result.setDocumentCount( n );
+        result.setTimestamp( date );
+        result.setRootGroups( rootGroups.keySet() );
+        result.setAllGroups( allGroups.keySet() );
+
+        LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
+        return result;
+    }
+
+    private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
+            throws IOException
+    {
+        LOGGER.debug( "Reading MT index..." );
+        Instant start = Instant.now();
+        long timestamp = readHeader();
+
+        int n = 0;
+
+        final Document theEnd = new Document();
+
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
+        ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
+
+        ExecutorService executorService = Executors.newFixedThreadPool( threads );
+        ArrayList<Exception> errors = new ArrayList<>();
+        ArrayList<IndexWriter> silos = new ArrayList<>( threads );
+        for ( int i = 0; i < threads; i++ )
+        {
+            final int silo = i;
+            silos.add( tempWriter( "silo" + i ) );
+            executorService.execute( () ->
             {
-                w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+                LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
+                try
+                {
+                    while ( true )
+                    {
+                        try
+                        {
+                            Document doc = queue.take();
+                            if ( doc == theEnd )
+                            {
+                                break;
+                            }
+                            addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
+                        }
+                        catch ( InterruptedException | IOException e )
+                        {
+                            errors.add( e );
+                            break;
+                        }
+                    }
+                }
+                finally
+                {
+                    LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
+                }
+            } );
+        }
 
-                rootGroups.add( ai.getRootGroup() );
-                allGroups.add( ai.getGroupId() );
-            }
-            else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
-                    || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+        try
+        {
+            Document doc;
+            while ( ( doc = readDocument() ) != null )
             {
-                // skip it
+                queue.put( doc );
+                n++;
             }
-            else
+            LOGGER.debug( "Signalling END" );
+            for ( int i = 0; i < threads; i++ )
             {
-                w.addDocument( doc );
+                queue.put( theEnd );
             }
-            n++;
+
+            LOGGER.debug( "Shutting down threads" );
+            executorService.shutdown();
+            executorService.awaitTermination( 5L, TimeUnit.MINUTES );
+        }
+        catch ( InterruptedException e )
+        {
+            throw new IOException( "Interrupted", e );
+        }
+
+        if ( !errors.isEmpty() )
+        {
+            IOException exception = new IOException( "Error during load of index" );
+            errors.forEach( exception::addSuppressed );
+            throw exception;
+        }
+
+        LOGGER.debug( "Silos loaded..." );
+        Date date = null;
+        if ( timestamp != -1 )
+        {
+            date = new Date( timestamp );
+            IndexUtils.updateTimestamp( w.getDirectory(), date );
         }
 
+        LOGGER.debug( "Merging silos..." );
+        for ( IndexWriter silo : silos )
+        {
+            IndexUtils.close( silo );
+            w.addIndexes( silo.getDirectory() );
+        }
+
+        LOGGER.debug( "Merged silos..." );
         w.commit();

Review Comment:
   I have been experimenting with this in the NetBeans maven support modules and MT extraction works great!
   
   Could maven-indexer cleanup the temp silo folders after the merge? It is currently leaving almost 6 GB behind after a full update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@maven.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [maven-indexer] mbien commented on a diff in pull request #255: [MINDEXER-151] Speed up Index update from remote

Posted by GitBox <gi...@apache.org>.
mbien commented on code in PR #255:
URL: https://github.com/apache/maven-indexer/pull/255#discussion_r1028796299


##########
indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java:
##########
@@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
         int n = 0;
 
         Document doc;
-        Set<String> rootGroups = new LinkedHashSet<>();
-        Set<String> allGroups = new LinkedHashSet<>();
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
 
         while ( ( doc = readDocument() ) != null )
         {
-            ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
-            if ( ai != null )
+            addToIndex( doc, context, w, rootGroups, allGroups );
+            n++;
+        }
+
+        w.commit();
+
+        IndexDataReadResult result = new IndexDataReadResult();
+        result.setDocumentCount( n );
+        result.setTimestamp( date );
+        result.setRootGroups( rootGroups.keySet() );
+        result.setAllGroups( allGroups.keySet() );
+
+        LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
+        return result;
+    }
+
+    private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
+            throws IOException
+    {
+        LOGGER.debug( "Reading MT index..." );
+        Instant start = Instant.now();
+        long timestamp = readHeader();
+
+        int n = 0;
+
+        final Document theEnd = new Document();
+
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
+        ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
+
+        ExecutorService executorService = Executors.newFixedThreadPool( threads );
+        ArrayList<Exception> errors = new ArrayList<>();
+        ArrayList<IndexWriter> silos = new ArrayList<>( threads );
+        for ( int i = 0; i < threads; i++ )
+        {
+            final int silo = i;
+            silos.add( tempWriter( "silo" + i ) );
+            executorService.execute( () ->
             {
-                w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+                LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
+                try
+                {
+                    while ( true )
+                    {
+                        try
+                        {
+                            Document doc = queue.take();
+                            if ( doc == theEnd )
+                            {
+                                break;
+                            }
+                            addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
+                        }
+                        catch ( InterruptedException | IOException e )
+                        {
+                            errors.add( e );
+                            break;
+                        }
+                    }
+                }
+                finally
+                {
+                    LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
+                }
+            } );
+        }
 
-                rootGroups.add( ai.getRootGroup() );
-                allGroups.add( ai.getGroupId() );
-            }
-            else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
-                    || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+        try
+        {
+            Document doc;
+            while ( ( doc = readDocument() ) != null )
             {
-                // skip it
+                queue.put( doc );
+                n++;
             }
-            else
+            LOGGER.debug( "Signalling END" );
+            for ( int i = 0; i < threads; i++ )
             {
-                w.addDocument( doc );
+                queue.put( theEnd );
             }
-            n++;
+
+            LOGGER.debug( "Shutting down threads" );
+            executorService.shutdown();
+            executorService.awaitTermination( 5L, TimeUnit.MINUTES );
+        }
+        catch ( InterruptedException e )
+        {
+            throw new IOException( "Interrupted", e );
+        }
+
+        if ( !errors.isEmpty() )
+        {
+            IOException exception = new IOException( "Error during load of index" );
+            errors.forEach( exception::addSuppressed );
+            throw exception;
+        }
+
+        LOGGER.debug( "Silos loaded..." );
+        Date date = null;
+        if ( timestamp != -1 )
+        {
+            date = new Date( timestamp );
+            IndexUtils.updateTimestamp( w.getDirectory(), date );
         }
 
+        LOGGER.debug( "Merging silos..." );
+        for ( IndexWriter silo : silos )
+        {
+            IndexUtils.close( silo );
+            w.addIndexes( silo.getDirectory() );
+        }
+
+        LOGGER.debug( "Merged silos..." );
         w.commit();

Review Comment:
   I have been experimenting with this in the NetBeans maven support modules and MT extraction ([#4999](https://github.com/apache/netbeans/pull/4999)) and it works great!
   
   Could maven-indexer cleanup the temp silo folders after the merge? It is currently leaving almost 6 GB behind after a full update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@maven.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [maven-indexer] cstamas merged pull request #255: [MINDEXER-151] Speed up Index update from remote

Posted by GitBox <gi...@apache.org>.
cstamas merged PR #255:
URL: https://github.com/apache/maven-indexer/pull/255


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@maven.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [maven-indexer] cstamas commented on a diff in pull request #255: [MINDEXER-151] Speed up Index update from remote

Posted by GitBox <gi...@apache.org>.
cstamas commented on code in PR #255:
URL: https://github.com/apache/maven-indexer/pull/255#discussion_r1030448625


##########
indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java:
##########
@@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
         int n = 0;
 
         Document doc;
-        Set<String> rootGroups = new LinkedHashSet<>();
-        Set<String> allGroups = new LinkedHashSet<>();
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
 
         while ( ( doc = readDocument() ) != null )
         {
-            ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
-            if ( ai != null )
+            addToIndex( doc, context, w, rootGroups, allGroups );
+            n++;
+        }
+
+        w.commit();
+
+        IndexDataReadResult result = new IndexDataReadResult();
+        result.setDocumentCount( n );
+        result.setTimestamp( date );
+        result.setRootGroups( rootGroups.keySet() );
+        result.setAllGroups( allGroups.keySet() );
+
+        LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
+        return result;
+    }
+
+    private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
+            throws IOException
+    {
+        LOGGER.debug( "Reading MT index..." );
+        Instant start = Instant.now();
+        long timestamp = readHeader();
+
+        int n = 0;
+
+        final Document theEnd = new Document();
+
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
+        ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
+
+        ExecutorService executorService = Executors.newFixedThreadPool( threads );
+        ArrayList<Exception> errors = new ArrayList<>();
+        ArrayList<IndexWriter> silos = new ArrayList<>( threads );
+        for ( int i = 0; i < threads; i++ )
+        {
+            final int silo = i;
+            silos.add( tempWriter( "silo" + i ) );
+            executorService.execute( () ->
             {
-                w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+                LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
+                try
+                {
+                    while ( true )
+                    {
+                        try
+                        {
+                            Document doc = queue.take();
+                            if ( doc == theEnd )
+                            {
+                                break;
+                            }
+                            addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
+                        }
+                        catch ( InterruptedException | IOException e )
+                        {
+                            errors.add( e );
+                            break;
+                        }
+                    }
+                }
+                finally
+                {
+                    LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
+                }
+            } );
+        }
 
-                rootGroups.add( ai.getRootGroup() );
-                allGroups.add( ai.getGroupId() );
-            }
-            else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
-                    || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+        try
+        {
+            Document doc;
+            while ( ( doc = readDocument() ) != null )
             {
-                // skip it
+                queue.put( doc );
+                n++;
             }
-            else
+            LOGGER.debug( "Signalling END" );
+            for ( int i = 0; i < threads; i++ )
             {
-                w.addDocument( doc );
+                queue.put( theEnd );
             }
-            n++;
+
+            LOGGER.debug( "Shutting down threads" );
+            executorService.shutdown();
+            executorService.awaitTermination( 5L, TimeUnit.MINUTES );
+        }
+        catch ( InterruptedException e )
+        {
+            throw new IOException( "Interrupted", e );
+        }
+
+        if ( !errors.isEmpty() )
+        {
+            IOException exception = new IOException( "Error during load of index" );
+            errors.forEach( exception::addSuppressed );
+            throw exception;
+        }
+
+        LOGGER.debug( "Silos loaded..." );
+        Date date = null;
+        if ( timestamp != -1 )
+        {
+            date = new Date( timestamp );
+            IndexUtils.updateTimestamp( w.getDirectory(), date );
         }
 
+        LOGGER.debug( "Merging silos..." );
+        for ( IndexWriter silo : silos )
+        {
+            IndexUtils.close( silo );
+            w.addIndexes( silo.getDirectory() );
+        }
+
+        LOGGER.debug( "Merged silos..." );
         w.commit();

Review Comment:
   Yes, good idea, I forgot about it, sorry. Created https://issues.apache.org/jira/browse/MINDEXER-176



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@maven.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org