You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2022/02/21 08:46:50 UTC

[maven-surefire] branch release/2.22.3 updated: [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch release/2.22.3
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git


The following commit(s) were added to refs/heads/release/2.22.3 by this push:
     new e7fe0dd  [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
e7fe0dd is described below

commit e7fe0ddda6211babe39f565da739db3a86c89264
Author: tibor.digana <ti...@apache.org>
AuthorDate: Sun Feb 20 15:46:26 2022 +0100

    [SUREFIRE-2019] ThreadedStreamConsumer - use Thread.join() instead of CountDownLatch.await()
---
 .../output/ThreadedStreamConsumer.java             | 223 +++++++++++++++++----
 1 file changed, 181 insertions(+), 42 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 820ecf3..9369bce 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -19,35 +19,41 @@ package org.apache.maven.plugin.surefire.booterclient.output;
  * under the License.
  */
 
-import org.apache.maven.shared.utils.cli.StreamConsumer;
-import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
+import javax.annotation.Nonnull;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import org.apache.maven.shared.utils.cli.StreamConsumer;
 
 import static java.lang.Thread.currentThread;
+import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThread;
 
 /**
- * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
+ * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
+ * <br>
+ * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
+ * 6.33 mega messages per second
+ * (158 nano seconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
+ * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
  *
  * @author Kristian Rosenvold
  */
 public final class ThreadedStreamConsumer
         implements StreamConsumer, Closeable
 {
+    private static final int QUEUE_MAX_ITEMS = 10000;
     private static final String END_ITEM = "";
 
-    private static final int ITEM_LIMIT_BEFORE_SLEEP = 10 * 1000;
-
-    private final BlockingQueue<String> items = new ArrayBlockingQueue<String>( ITEM_LIMIT_BEFORE_SLEEP );
-
+    private final QueueSynchronizer<String> synchronizer = new QueueSynchronizer<String>( QUEUE_MAX_ITEMS, END_ITEM );
     private final AtomicBoolean stop = new AtomicBoolean();
-
-    private final Thread thread;
-
+    private final AtomicBoolean isAlive = new AtomicBoolean( true );
+    private final Thread consumer;
     private final Pumper pumper;
 
     final class Pumper
@@ -76,22 +82,28 @@ public final class ThreadedStreamConsumer
         @Override
         public void run()
         {
-            while ( !ThreadedStreamConsumer.this.stop.get() )
+            while ( !stop.get() || !synchronizer.isEmptyQueue() )
             {
                 try
                 {
-                    String item = ThreadedStreamConsumer.this.items.take();
+                    String item = synchronizer.awaitNext();
+
                     if ( shouldStopQueueing( item ) )
                     {
-                        return;
+                        break;
                     }
+
                     target.consumeLine( item );
                 }
                 catch ( Throwable t )
                 {
+                    // ensure the stack trace to be at the instance of the exception
+                    t.getStackTrace();
                     errors.addException( t );
                 }
             }
+
+            isAlive.set( false );
         }
 
         boolean hasErrors()
@@ -108,27 +120,27 @@ public final class ThreadedStreamConsumer
     public ThreadedStreamConsumer( StreamConsumer target )
     {
         pumper = new Pumper( target );
-        thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
-        thread.start();
+        Thread consumer = newDaemonThread( pumper, "ThreadedStreamConsumer" );
+        consumer.setUncaughtExceptionHandler( new UncaughtExceptionHandler()
+        {
+            @Override
+            public void uncaughtException( Thread t, Throwable e )
+            {
+                isAlive.set( false );
+            }
+        } );
+        consumer.start();
+        this.consumer = consumer;
     }
 
     @Override
-    public void consumeLine( String s )
+    public void consumeLine( @Nonnull String s )
     {
-        if ( stop.get() && !thread.isAlive() )
-        {
-            items.clear();
-            return;
-        }
-
-        try
+        // Do NOT call Thread.isAlive() - slow.
+        // It makes worse performance from 790 millis to 1250 millis for 5 million messages.
+        if ( !stop.get() && isAlive.get() )
         {
-            items.put( s );
-        }
-        catch ( InterruptedException e )
-        {
-            currentThread().interrupt();
-            throw new IllegalStateException( e );
+            synchronizer.pushNext( s );
         }
     }
 
@@ -136,20 +148,29 @@ public final class ThreadedStreamConsumer
     public void close()
             throws IOException
     {
-        if ( !stop.get() )
+        isAlive.compareAndSet( true, consumer.isAlive() );
+        if ( stop.compareAndSet( false, true ) && isAlive.get() )
         {
-            try
-            {
-                items.put( END_ITEM );
-                thread.join();
-            }
-            catch ( InterruptedException e )
+            if ( currentThread().isInterrupted() )
             {
-                currentThread().interrupt();
+                synchronizer.markStopped();
+                consumer.interrupt();
             }
-            finally
+            else
             {
-                stop.set( true );
+                synchronizer.markStopped();
+
+                try
+                {
+                    consumer.join();
+                }
+                catch ( InterruptedException e )
+                {
+                    // we should not set interrupted=true in this Thread
+                    // if consumer's Thread was interrupted which is indicated by InterruptedException
+                }
+
+                synchronizer.clearQueue();
             }
         }
 
@@ -165,8 +186,126 @@ public final class ThreadedStreamConsumer
      * @param item    element from <code>items</code>
      * @return {@code true} if tail of the queue
      */
-    private boolean shouldStopQueueing( String item )
+    private static boolean shouldStopQueueing( String item )
     {
         return item == END_ITEM;
     }
+
+    /**
+     * This synchronization helper mostly avoids the locks.
+     * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
+     * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
+     *
+     * @param <T> element type in the queue
+     */
+    static class QueueSynchronizer<T>
+    {
+        private final SyncT1 t1 = new SyncT1();
+        private final SyncT2 t2 = new SyncT2();
+        private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<T>();
+        private final AtomicInteger queueSize = new AtomicInteger();
+        private final int maxQueueSize;
+        private final T stopItemMarker;
+
+        QueueSynchronizer( int maxQueueSize, T stopItemMarker )
+        {
+            this.maxQueueSize = maxQueueSize;
+            this.stopItemMarker = stopItemMarker;
+        }
+
+        private class SyncT1 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() == 0 ? -1 : 1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void waitIfZero() throws InterruptedException
+            {
+                acquireSharedInterruptibly( 1 );
+            }
+
+            void release()
+            {
+                releaseShared( 0 );
+            }
+        }
+
+        private class SyncT2 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() < maxQueueSize ? 1 : -1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void awaitMax()
+            {
+                acquireShared( 1 );
+            }
+
+            void tryRelease()
+            {
+                if ( queueSize.get() == 0 )
+                {
+                    releaseShared( 0 );
+                }
+            }
+        }
+
+        void markStopped()
+        {
+            addNext( stopItemMarker );
+        }
+
+        void pushNext( T t )
+        {
+            t2.awaitMax();
+            addNext( t );
+        }
+
+        T awaitNext() throws InterruptedException
+        {
+            t2.tryRelease();
+            t1.waitIfZero();
+            queueSize.decrementAndGet();
+            return queue.poll();
+        }
+
+        boolean isEmptyQueue()
+        {
+            return queue.isEmpty();
+        }
+
+        void clearQueue()
+        {
+            queue.clear();
+        }
+
+        private void addNext( T t )
+        {
+            queue.add( t );
+            if ( queueSize.getAndIncrement() == 0 )
+            {
+                t1.release();
+            }
+        }
+    }
 }