You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@maven.apache.org by GitBox <gi...@apache.org> on 2022/11/23 13:33:10 UTC

[GitHub] [maven-indexer] cstamas commented on a diff in pull request #255: [MINDEXER-151] Speed up Index update from remote

cstamas commented on code in PR #255:
URL: https://github.com/apache/maven-indexer/pull/255#discussion_r1030448625


##########
indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java:
##########
@@ -88,44 +133,175 @@ public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
         int n = 0;
 
         Document doc;
-        Set<String> rootGroups = new LinkedHashSet<>();
-        Set<String> allGroups = new LinkedHashSet<>();
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
 
         while ( ( doc = readDocument() ) != null )
         {
-            ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
-            if ( ai != null )
+            addToIndex( doc, context, w, rootGroups, allGroups );
+            n++;
+        }
+
+        w.commit();
+
+        IndexDataReadResult result = new IndexDataReadResult();
+        result.setDocumentCount( n );
+        result.setTimestamp( date );
+        result.setRootGroups( rootGroups.keySet() );
+        result.setAllGroups( allGroups.keySet() );
+
+        LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
+        return result;
+    }
+
+    private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
+            throws IOException
+    {
+        LOGGER.debug( "Reading MT index..." );
+        Instant start = Instant.now();
+        long timestamp = readHeader();
+
+        int n = 0;
+
+        final Document theEnd = new Document();
+
+        ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+        ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
+        ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
+
+        ExecutorService executorService = Executors.newFixedThreadPool( threads );
+        ArrayList<Exception> errors = new ArrayList<>();
+        ArrayList<IndexWriter> silos = new ArrayList<>( threads );
+        for ( int i = 0; i < threads; i++ )
+        {
+            final int silo = i;
+            silos.add( tempWriter( "silo" + i ) );
+            executorService.execute( () ->
             {
-                w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+                LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
+                try
+                {
+                    while ( true )
+                    {
+                        try
+                        {
+                            Document doc = queue.take();
+                            if ( doc == theEnd )
+                            {
+                                break;
+                            }
+                            addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
+                        }
+                        catch ( InterruptedException | IOException e )
+                        {
+                            errors.add( e );
+                            break;
+                        }
+                    }
+                }
+                finally
+                {
+                    LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
+                }
+            } );
+        }
 
-                rootGroups.add( ai.getRootGroup() );
-                allGroups.add( ai.getGroupId() );
-            }
-            else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
-                    || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+        try
+        {
+            Document doc;
+            while ( ( doc = readDocument() ) != null )
             {
-                // skip it
+                queue.put( doc );
+                n++;
             }
-            else
+            LOGGER.debug( "Signalling END" );
+            for ( int i = 0; i < threads; i++ )
             {
-                w.addDocument( doc );
+                queue.put( theEnd );
             }
-            n++;
+
+            LOGGER.debug( "Shutting down threads" );
+            executorService.shutdown();
+            executorService.awaitTermination( 5L, TimeUnit.MINUTES );
+        }
+        catch ( InterruptedException e )
+        {
+            throw new IOException( "Interrupted", e );
+        }
+
+        if ( !errors.isEmpty() )
+        {
+            IOException exception = new IOException( "Error during load of index" );
+            errors.forEach( exception::addSuppressed );
+            throw exception;
+        }
+
+        LOGGER.debug( "Silos loaded..." );
+        Date date = null;
+        if ( timestamp != -1 )
+        {
+            date = new Date( timestamp );
+            IndexUtils.updateTimestamp( w.getDirectory(), date );
         }
 
+        LOGGER.debug( "Merging silos..." );
+        for ( IndexWriter silo : silos )
+        {
+            IndexUtils.close( silo );
+            w.addIndexes( silo.getDirectory() );
+        }
+
+        LOGGER.debug( "Merged silos..." );
         w.commit();

Review Comment:
   Yes, good idea, I forgot about it, sorry. Created https://issues.apache.org/jira/browse/MINDEXER-176



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@maven.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org