You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2022/02/21 08:46:50 UTC
[maven-surefire] branch release/2.22.3 updated: [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
This is an automated email from the ASF dual-hosted git repository.
tibordigana pushed a commit to branch release/2.22.3
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
The following commit(s) were added to refs/heads/release/2.22.3 by this push:
new e7fe0dd [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
e7fe0dd is described below
commit e7fe0ddda6211babe39f565da739db3a86c89264
Author: tibor.digana <ti...@apache.org>
AuthorDate: Sun Feb 20 15:46:26 2022 +0100
[SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
---
.../output/ThreadedStreamConsumer.java | 223 +++++++++++++++++----
1 file changed, 181 insertions(+), 42 deletions(-)
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 820ecf3..9369bce 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -19,35 +19,41 @@ package org.apache.maven.plugin.surefire.booterclient.output;
* under the License.
*/
-import org.apache.maven.shared.utils.cli.StreamConsumer;
-import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
+import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import org.apache.maven.shared.utils.cli.StreamConsumer;
import static java.lang.Thread.currentThread;
+import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
/**
- * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
+ * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
+ * <br>
+ * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
+ * 6.33 mega messages per second
+ * (158 nano seconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
+ * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
*
* @author Kristian Rosenvold
*/
public final class ThreadedStreamConsumer
implements StreamConsumer, Closeable
{
+ private static final int QUEUE_MAX_ITEMS = 10000;
private static final String END_ITEM = "";
- private static final int ITEM_LIMIT_BEFORE_SLEEP = 10 * 1000;
-
- private final BlockingQueue<String> items = new ArrayBlockingQueue<String>( ITEM_LIMIT_BEFORE_SLEEP );
-
+ private final QueueSynchronizer<String> synchronizer = new QueueSynchronizer<String>( QUEUE_MAX_ITEMS, END_ITEM );
private final AtomicBoolean stop = new AtomicBoolean();
-
- private final Thread thread;
-
+ private final AtomicBoolean isAlive = new AtomicBoolean( true );
+ private final Thread consumer;
private final Pumper pumper;
final class Pumper
@@ -76,22 +82,28 @@ public final class ThreadedStreamConsumer
@Override
public void run()
{
- while ( !ThreadedStreamConsumer.this.stop.get() )
+ while ( !stop.get() || !synchronizer.isEmptyQueue() )
{
try
{
- String item = ThreadedStreamConsumer.this.items.take();
+ String item = synchronizer.awaitNext();
+
if ( shouldStopQueueing( item ) )
{
- return;
+ break;
}
+
target.consumeLine( item );
}
catch ( Throwable t )
{
+ // ensure the stack trace to be at the instance of the exception
+ t.getStackTrace();
errors.addException( t );
}
}
+
+ isAlive.set( false );
}
boolean hasErrors()
@@ -108,27 +120,27 @@ public final class ThreadedStreamConsumer
public ThreadedStreamConsumer( StreamConsumer target )
{
pumper = new Pumper( target );
- thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
- thread.start();
+ Thread consumer = newDaemonThread( pumper, "ThreadedStreamConsumer" );
+ consumer.setUncaughtExceptionHandler( new UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException( Thread t, Throwable e )
+ {
+ isAlive.set( false );
+ }
+ } );
+ consumer.start();
+ this.consumer = consumer;
}
@Override
- public void consumeLine( String s )
+ public void consumeLine( @Nonnull String s )
{
- if ( stop.get() && !thread.isAlive() )
- {
- items.clear();
- return;
- }
-
- try
+ // Do NOT call Thread.isAlive() - slow.
+ // It makes worse performance from 790 millis to 1250 millis for 5 million messages.
+ if ( !stop.get() && isAlive.get() )
{
- items.put( s );
- }
- catch ( InterruptedException e )
- {
- currentThread().interrupt();
- throw new IllegalStateException( e );
+ synchronizer.pushNext( s );
}
}
@@ -136,20 +148,29 @@ public final class ThreadedStreamConsumer
public void close()
throws IOException
{
- if ( !stop.get() )
+ isAlive.compareAndSet( true, consumer.isAlive() );
+ if ( stop.compareAndSet( false, true ) && isAlive.get() )
{
- try
- {
- items.put( END_ITEM );
- thread.join();
- }
- catch ( InterruptedException e )
+ if ( currentThread().isInterrupted() )
{
- currentThread().interrupt();
+ synchronizer.markStopped();
+ consumer.interrupt();
}
- finally
+ else
{
- stop.set( true );
+ synchronizer.markStopped();
+
+ try
+ {
+ consumer.join();
+ }
+ catch ( InterruptedException e )
+ {
+ // we should not set interrupted=true in this Thread
+ // if consumer's Thread was interrupted which is indicated by InterruptedException
+ }
+
+ synchronizer.clearQueue();
}
}
@@ -165,8 +186,126 @@ public final class ThreadedStreamConsumer
* @param item element from <code>items</code>
* @return {@code true} if tail of the queue
*/
- private boolean shouldStopQueueing( String item )
+ private static boolean shouldStopQueueing( String item )
{
return item == END_ITEM;
}
+
+ /**
+ * This synchronization helper mostly avoids the locks.
+ * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
+ * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
+ *
+ * @param <T> element type in the queue
+ */
+ static class QueueSynchronizer<T>
+ {
+ private final SyncT1 t1 = new SyncT1();
+ private final SyncT2 t2 = new SyncT2();
+ private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<T>();
+ private final AtomicInteger queueSize = new AtomicInteger();
+ private final int maxQueueSize;
+ private final T stopItemMarker;
+
+ QueueSynchronizer( int maxQueueSize, T stopItemMarker )
+ {
+ this.maxQueueSize = maxQueueSize;
+ this.stopItemMarker = stopItemMarker;
+ }
+
+ private class SyncT1 extends AbstractQueuedSynchronizer
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected int tryAcquireShared( int arg )
+ {
+ return queueSize.get() == 0 ? -1 : 1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared( int arg )
+ {
+ return true;
+ }
+
+ void waitIfZero() throws InterruptedException
+ {
+ acquireSharedInterruptibly( 1 );
+ }
+
+ void release()
+ {
+ releaseShared( 0 );
+ }
+ }
+
+ private class SyncT2 extends AbstractQueuedSynchronizer
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected int tryAcquireShared( int arg )
+ {
+ return queueSize.get() < maxQueueSize ? 1 : -1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared( int arg )
+ {
+ return true;
+ }
+
+ void awaitMax()
+ {
+ acquireShared( 1 );
+ }
+
+ void tryRelease()
+ {
+ if ( queueSize.get() == 0 )
+ {
+ releaseShared( 0 );
+ }
+ }
+ }
+
+ void markStopped()
+ {
+ addNext( stopItemMarker );
+ }
+
+ void pushNext( T t )
+ {
+ t2.awaitMax();
+ addNext( t );
+ }
+
+ T awaitNext() throws InterruptedException
+ {
+ t2.tryRelease();
+ t1.waitIfZero();
+ queueSize.decrementAndGet();
+ return queue.poll();
+ }
+
+ boolean isEmptyQueue()
+ {
+ return queue.isEmpty();
+ }
+
+ void clearQueue()
+ {
+ queue.clear();
+ }
+
+ private void addNext( T t )
+ {
+ queue.add( t );
+ if ( queueSize.getAndIncrement() == 0 )
+ {
+ t1.release();
+ }
+ }
+ }
}