You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by cs...@apache.org on 2022/04/28 10:40:10 UTC
[maven-indexer] 01/01: [MINDEXER-151] Proof of concept
This is an automated email from the ASF dual-hosted git repository.
cstamas pushed a commit to branch MINDEXER-151-poc
in repository https://gitbox.apache.org/repos/asf/maven-indexer.git
commit d6e8a38821635ad16a7534b4f4917218be3e2096
Author: Tamas Cservenak <ta...@cservenak.net>
AuthorDate: Thu Apr 28 12:38:49 2022 +0200
[MINDEXER-151] Proof of concept
This change (ingesting GZIP files raw records into Lucene
index on multiple threads) on my PC halves the execution time:
while BasicUsageExample on master takes over 15 minutes to
finish (when doing full update), this PR makes it under
7 minutes.
---
.../maven/index/updater/IndexDataReader.java | 144 ++++++++++++++++-----
indexer-examples/indexer-examples-basic/pom.xml | 2 +-
.../maven/index/examples/BasicUsageExample.java | 8 +-
3 files changed, 122 insertions(+), 32 deletions(-)
diff --git a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
index b1c4237..e4beae9 100644
--- a/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
+++ b/indexer-core/src/main/java/org/apache/maven/index/updater/IndexDataReader.java
@@ -26,9 +26,17 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UTFDataFormatException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
import java.util.Date;
-import java.util.LinkedHashSet;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import org.apache.lucene.document.Document;
@@ -39,6 +47,8 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.maven.index.ArtifactInfo;
import org.apache.maven.index.context.IndexUtils;
import org.apache.maven.index.context.IndexingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An index data reader used to parse transfer index format.
@@ -47,10 +57,12 @@ import org.apache.maven.index.context.IndexingContext;
*/
public class IndexDataReader
{
+ private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class );
+
private final DataInputStream dis;
public IndexDataReader( final InputStream is )
- throws IOException
+ throws IOException
{
// MINDEXER-13
// LightweightHttpWagon may have performed automatic decompression
@@ -72,8 +84,11 @@ public class IndexDataReader
}
public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
- throws IOException
+ throws IOException
{
+ LOGGER.info( "Reading index..." );
+ Instant start = Instant.now();
+
long timestamp = readHeader();
Date date = null;
@@ -87,45 +102,114 @@ public class IndexDataReader
int n = 0;
- Document doc;
- Set<String> rootGroups = new LinkedHashSet<>();
- Set<String> allGroups = new LinkedHashSet<>();
+ final Document END = new Document();
- while ( ( doc = readDocument() ) != null )
+ ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
+ ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
+ ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
+ int threads = Runtime.getRuntime().availableProcessors() / 2;
+ ExecutorService executorService = Executors.newFixedThreadPool( threads );
+ ArrayList<Exception> errors = new ArrayList<>();
+
+ for ( int i = 0; i < threads; i++ )
{
- ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
- if ( ai != null )
- {
- w.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+ executorService.execute( () -> {
+ LOGGER.info( "Starting thread {}", Thread.currentThread().getName() );
+ try
+ {
+ while ( true )
+ {
+ try
+ {
+ Document doc = queue.take();
+ if ( doc == END )
+ {
+ break;
+ }
+ addToIndex( doc, context, w, rootGroups, allGroups );
+ }
+ catch ( InterruptedException | IOException e )
+ {
+ errors.add( e );
+ break;
+ }
+ }
+ }
+ finally
+ {
+ LOGGER.info( "Done thread {}", Thread.currentThread().getName() );
+ }
+ } );
+ }
- rootGroups.add( ai.getRootGroup() );
- allGroups.add( ai.getGroupId() );
- }
- else if ( doc.getField( ArtifactInfo.ALL_GROUPS ) != null
- || doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+ try
+ {
+ Document doc;
+ while ( ( doc = readDocument() ) != null )
{
- // skip it
+ queue.put( doc );
+ n++;
}
- else
+ LOGGER.info( "Signalling END" );
+ for ( int i = 0; i < threads; i++ )
{
- w.addDocument( doc );
+ queue.put( END );
}
- n++;
+
+ LOGGER.info( "Shutting down threads" );
+ executorService.shutdown();
+ executorService.awaitTermination( 5L, TimeUnit.MINUTES );
+ }
+ catch ( InterruptedException e )
+ {
+ throw new IOException( "Interrupted", e );
}
+ if ( !errors.isEmpty() )
+ {
+ IOException exception = new IOException( "Error during load of index" );
+ errors.forEach( exception::addSuppressed );
+ throw exception;
+ }
+
+ LOGGER.info( "Commit..." );
w.commit();
IndexDataReadResult result = new IndexDataReadResult();
result.setDocumentCount( n );
result.setTimestamp( date );
- result.setRootGroups( rootGroups );
- result.setAllGroups( allGroups );
+ result.setRootGroups( rootGroups.keySet() );
+ result.setAllGroups( allGroups.keySet() );
+ LOGGER.info( "Reading index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
return result;
}
+ private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter,
+ final ConcurrentMap<String, Boolean> rootGroups,
+ final ConcurrentMap<String, Boolean> allGroups )
+ throws IOException
+ {
+ ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
+ if ( ai != null )
+ {
+ indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
+
+ rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE );
+ allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE );
+ }
+ else
+ {
+ if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null
+ && doc.getField( ArtifactInfo.ROOT_GROUPS ) != null )
+ {
+ indexWriter.addDocument( doc );
+ }
+ }
+ }
+
public long readHeader()
- throws IOException
+ throws IOException
{
final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 );
@@ -139,7 +223,7 @@ public class IndexDataReader
}
public Document readDocument()
- throws IOException
+ throws IOException
{
int fieldCount;
try
@@ -160,7 +244,7 @@ public class IndexDataReader
// Fix up UINFO field wrt MINDEXER-41
final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO );
- final String info = doc.get( ArtifactInfo.INFO );
+ final String info = doc.get( ArtifactInfo.INFO );
if ( uinfoField != null && info != null && !info.isEmpty() )
{
final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info );
@@ -179,7 +263,7 @@ public class IndexDataReader
}
private Field readField()
- throws IOException
+ throws IOException
{
int flags = dis.read();
@@ -199,7 +283,7 @@ public class IndexDataReader
}
private static String readUTF( DataInput in )
- throws IOException
+ throws IOException
{
int utflen = in.readInt();
@@ -214,7 +298,7 @@ public class IndexDataReader
catch ( OutOfMemoryError e )
{
throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
- + " See MINDEXER-28 for more information!", e );
+ + " See MINDEXER-28 for more information!", e );
}
int c, char2, char3;
@@ -282,7 +366,7 @@ public class IndexDataReader
throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) );
}
chararr[chararrCount++] =
- (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
+ (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
break;
default:
@@ -360,7 +444,7 @@ public class IndexDataReader
* @throws IOException in case of an IO exception during index file access
*/
public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context )
- throws IOException
+ throws IOException
{
dis.readByte(); // data format version
diff --git a/indexer-examples/indexer-examples-basic/pom.xml b/indexer-examples/indexer-examples-basic/pom.xml
index 1881758..7b45ed1 100644
--- a/indexer-examples/indexer-examples-basic/pom.xml
+++ b/indexer-examples/indexer-examples-basic/pom.xml
@@ -58,7 +58,7 @@ under the License.
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
- <scope>test</scope>
+ <scope>runtime</scope>
</dependency>
<dependency>
diff --git a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java
index 7d79508..722947d 100644
--- a/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java
+++ b/indexer-examples/indexer-examples-basic/src/main/java/org/apache/maven/index/examples/BasicUsageExample.java
@@ -68,6 +68,8 @@ import org.eclipse.aether.version.Version;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -142,14 +144,17 @@ public class BasicUsageExample
// Preferred frequency is once a week.
if ( true )
{
+ Instant updateStart = Instant.now();
System.out.println( "Updating Index..." );
System.out.println( "This might take a while on first run, so please be patient!" );
// Create ResourceFetcher implementation to be used with IndexUpdateRequest
// Here, we use Wagon based one as shorthand, but all we need is a ResourceFetcher implementation
TransferListener listener = new AbstractTransferListener()
{
+ Instant start;
public void transferStarted( TransferEvent transferEvent )
{
+ start = Instant.now();
System.out.print( " Downloading " + transferEvent.getResource().getName() );
}
@@ -159,7 +164,7 @@ public class BasicUsageExample
public void transferCompleted( TransferEvent transferEvent )
{
- System.out.println( " - Done" );
+ System.out.println( " - Done in " + Duration.between( start, Instant.now() ).getSeconds() + " sec" );
}
};
ResourceFetcher resourceFetcher = new WagonHelper.WagonFetcher( httpWagon, listener, null, null );
@@ -182,6 +187,7 @@ public class BasicUsageExample
+ updateResult.getTimestamp() + " period." );
}
+ System.out.println( "Finished in " + Duration.between( updateStart, Instant.now() ).getSeconds() + " sec" );
System.out.println();
}