You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2022/02/21 08:43:43 UTC

[maven-surefire] branch master updated: [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git


The following commit(s) were added to refs/heads/master by this push:
     new 892b9db  [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
892b9db is described below

commit 892b9db9d7ed8f472bd6d88f24b70441744e828e
Author: tibor.digana <ti...@apache.org>
AuthorDate: Sun Feb 20 12:25:14 2022 +0100

    [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
---
 .../output/ThreadedStreamConsumer.java             | 52 +++++++++++++---------
 1 file changed, 32 insertions(+), 20 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index f1f6722..fd9b6b7 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -19,18 +19,19 @@ package org.apache.maven.plugin.surefire.booterclient.output;
  * under the License.
  */
 
-import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.extensions.EventHandler;
-
 import javax.annotation.Nonnull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.extensions.EventHandler;
+
+import static java.lang.Thread.currentThread;
 import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
 
 /**
@@ -51,9 +52,9 @@ public final class ThreadedStreamConsumer
 
     private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
     private final AtomicBoolean stop = new AtomicBoolean();
-    private final CountDownLatch threadJoiner = new CountDownLatch( 1 );
+    private final AtomicBoolean isAlive = new AtomicBoolean( true );
+    private final Thread consumer;
     private final Pumper pumper;
-    private volatile boolean isAlive = true;
 
     final class Pumper
         implements Runnable
@@ -102,8 +103,7 @@ public final class ThreadedStreamConsumer
                 }
             }
 
-            threadJoiner.countDown();
-            isAlive = false;
+            isAlive.set( false );
         }
 
         boolean hasErrors()
@@ -120,8 +120,10 @@ public final class ThreadedStreamConsumer
     public ThreadedStreamConsumer( EventHandler<Event> target )
     {
         pumper = new Pumper( target );
-        Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" );
-        thread.start();
+        Thread consumer = newDaemonThread( pumper, "ThreadedStreamConsumer" );
+        consumer.setUncaughtExceptionHandler( ( t, e ) -> isAlive.set( false ) );
+        consumer.start();
+        this.consumer = consumer;
     }
 
     @Override
@@ -129,7 +131,7 @@ public final class ThreadedStreamConsumer
     {
         // Do NOT call Thread.isAlive() - slow.
         // It makes worse performance from 790 millis to 1250 millis for 5 million messages.
-        if ( !stop.get() && isAlive )
+        if ( !stop.get() && isAlive.get() )
         {
             synchronizer.pushNext( event );
         }
@@ -139,20 +141,30 @@ public final class ThreadedStreamConsumer
     public void close()
         throws IOException
     {
-        if ( stop.compareAndSet( false, true ) && isAlive )
+        isAlive.compareAndSet( true, consumer.isAlive() );
+        if ( stop.compareAndSet( false, true ) && isAlive.get() )
         {
-            synchronizer.markStopped();
-
-            try
+            if ( currentThread().isInterrupted() )
             {
-                threadJoiner.await();
+                synchronizer.markStopped();
+                consumer.interrupt();
             }
-            catch ( InterruptedException e )
+            else
             {
-                Thread.currentThread().interrupt();
-            }
+                synchronizer.markStopped();
+
+                try
+                {
+                    consumer.join();
+                }
+                catch ( InterruptedException e )
+                {
+                    // we should not set interrupted=true in this Thread
+                    // if consumer's Thread was interrupted which is indicated by InterruptedException
+                }
 
-            synchronizer.clearQueue();
+                synchronizer.clearQueue();
+            }
         }
 
         if ( pumper.hasErrors() )