You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2022/02/20 11:25:38 UTC
[maven-surefire] 01/01: TSC - Thread.isAlive and interruption
This is an automated email from the ASF dual-hosted git repository.
tibordigana pushed a commit to branch event-consumer-isalive
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
commit 1139aa5a1ccf882f3c7d6961b2ee0e41dddb2529
Author: tibor.digana <ti...@apache.org>
AuthorDate: Sun Feb 20 12:25:14 2022 +0100
TSC - Thread.isAlive and interruption
---
.../output/ThreadedStreamConsumer.java | 51 +++++++++++++---------
1 file changed, 31 insertions(+), 20 deletions(-)
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index f1f6722..e9ccdbd 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -19,18 +19,18 @@ package org.apache.maven.plugin.surefire.booterclient.output;
* under the License.
*/
-import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.extensions.EventHandler;
-
import javax.annotation.Nonnull;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.extensions.EventHandler;
+
import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
/**
@@ -51,9 +51,9 @@ public final class ThreadedStreamConsumer
private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
private final AtomicBoolean stop = new AtomicBoolean();
- private final CountDownLatch threadJoiner = new CountDownLatch( 1 );
+ private final AtomicBoolean isAlive = new AtomicBoolean( true );
+ private final Thread consumer;
private final Pumper pumper;
- private volatile boolean isAlive = true;
final class Pumper
implements Runnable
@@ -102,8 +102,7 @@ public final class ThreadedStreamConsumer
}
}
- threadJoiner.countDown();
- isAlive = false;
+ isAlive.set( false );
}
boolean hasErrors()
@@ -120,8 +119,10 @@ public final class ThreadedStreamConsumer
public ThreadedStreamConsumer( EventHandler<Event> target )
{
pumper = new Pumper( target );
- Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" );
- thread.start();
+ Thread consumer = newDaemonThread( pumper, "ThreadedStreamConsumer" );
+ consumer.setUncaughtExceptionHandler( ( t, e ) -> isAlive.set( false ) );
+ consumer.start();
+ this.consumer = consumer;
}
@Override
@@ -129,7 +130,7 @@ public final class ThreadedStreamConsumer
{
// Do NOT call Thread.isAlive() - slow.
// It makes worse performance from 790 millis to 1250 millis for 5 million messages.
- if ( !stop.get() && isAlive )
+ if ( !stop.get() && isAlive.get() )
{
synchronizer.pushNext( event );
}
@@ -139,20 +140,30 @@ public final class ThreadedStreamConsumer
public void close()
throws IOException
{
- if ( stop.compareAndSet( false, true ) && isAlive )
+ isAlive.compareAndSet( true, consumer.isAlive() );
+ if ( stop.compareAndSet( false, true ) && isAlive.get() )
{
- synchronizer.markStopped();
-
- try
+ if ( Thread.currentThread().isInterrupted() )
{
- threadJoiner.await();
+ synchronizer.markStopped();
+ consumer.interrupt();
}
- catch ( InterruptedException e )
+ else
{
- Thread.currentThread().interrupt();
- }
+ synchronizer.markStopped();
+
+ try
+ {
+ consumer.join();
+ }
+ catch ( InterruptedException e )
+ {
+ // we should not set interrupted=true in this Thread
+ // if consumer's Thread was interrupted which is indicated by InterruptedException
+ }
- synchronizer.clearQueue();
+ synchronizer.clearQueue();
+ }
}
if ( pumper.hasErrors() )