You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2022/02/21 08:43:43 UTC
[maven-surefire] branch master updated: [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
This is an automated email from the ASF dual-hosted git repository.
tibordigana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
The following commit(s) were added to refs/heads/master by this push:
new 892b9db [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
892b9db is described below
commit 892b9db9d7ed8f472bd6d88f24b70441744e828e
Author: tibor.digana <ti...@apache.org>
AuthorDate: Sun Feb 20 12:25:14 2022 +0100
[SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
---
.../output/ThreadedStreamConsumer.java | 52 +++++++++++++---------
1 file changed, 32 insertions(+), 20 deletions(-)
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index f1f6722..fd9b6b7 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -19,18 +19,19 @@ package org.apache.maven.plugin.surefire.booterclient.output;
* under the License.
*/
-import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.extensions.EventHandler;
-
import javax.annotation.Nonnull;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.extensions.EventHandler;
+
+import static java.lang.Thread.currentThread;
import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
/**
@@ -51,9 +52,9 @@ public final class ThreadedStreamConsumer
private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
private final AtomicBoolean stop = new AtomicBoolean();
- private final CountDownLatch threadJoiner = new CountDownLatch( 1 );
+ private final AtomicBoolean isAlive = new AtomicBoolean( true );
+ private final Thread consumer;
private final Pumper pumper;
- private volatile boolean isAlive = true;
final class Pumper
implements Runnable
@@ -102,8 +103,7 @@ public final class ThreadedStreamConsumer
}
}
- threadJoiner.countDown();
- isAlive = false;
+ isAlive.set( false );
}
boolean hasErrors()
@@ -120,8 +120,10 @@ public final class ThreadedStreamConsumer
public ThreadedStreamConsumer( EventHandler<Event> target )
{
pumper = new Pumper( target );
- Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" );
- thread.start();
+ Thread consumer = newDaemonThread( pumper, "ThreadedStreamConsumer" );
+ consumer.setUncaughtExceptionHandler( ( t, e ) -> isAlive.set( false ) );
+ consumer.start();
+ this.consumer = consumer;
}
@Override
@@ -129,7 +131,7 @@ public final class ThreadedStreamConsumer
{
// Do NOT call Thread.isAlive() - slow.
// It makes worse performance from 790 millis to 1250 millis for 5 million messages.
- if ( !stop.get() && isAlive )
+ if ( !stop.get() && isAlive.get() )
{
synchronizer.pushNext( event );
}
@@ -139,20 +141,30 @@ public final class ThreadedStreamConsumer
public void close()
throws IOException
{
- if ( stop.compareAndSet( false, true ) && isAlive )
+ isAlive.compareAndSet( true, consumer.isAlive() );
+ if ( stop.compareAndSet( false, true ) && isAlive.get() )
{
- synchronizer.markStopped();
-
- try
+ if ( currentThread().isInterrupted() )
{
- threadJoiner.await();
+ synchronizer.markStopped();
+ consumer.interrupt();
}
- catch ( InterruptedException e )
+ else
{
- Thread.currentThread().interrupt();
- }
+ synchronizer.markStopped();
+
+ try
+ {
+ consumer.join();
+ }
+ catch ( InterruptedException e )
+ {
+ // we should not set interrupted=true in this Thread
+ // if consumer's Thread was interrupted which is indicated by InterruptedException
+ }
- synchronizer.clearQueue();
+ synchronizer.clearQueue();
+ }
}
if ( pumper.hasErrors() )