You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2020/07/18 01:24:18 UTC

[maven-surefire] branch fastqueue updated (f25618b -> 57b2d08)

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a change to branch fastqueue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git.


    from f25618b  Added tests with JPMS and resources in Surefire1733JUnitIT
     add 24328c5  [SUREFIRE-1820] Using SurefireForkNodeFactory with JDK8 results in NoSuchMethodError
     add 2f4ed6f  [SUREFIRE-1824] failsafe-summary.xml should properly use UTF-8
     new 57b2d08  faster queue

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../failsafe/util/FailsafeSummaryXmlUtils.java     |   8 +-
 .../output/ThreadedStreamConsumer.java             | 190 ++++++++++++++++-----
 .../surefire/report/ConsoleOutputFileReporter.java |  34 +---
 .../output/ThreadedStreamConsumerTest.java         | 151 ++++++++++++++++
 .../org/apache/maven/surefire/JUnit4SuiteTest.java |   2 +
 pom.xml                                            |  28 +++
 6 files changed, 341 insertions(+), 72 deletions(-)
 create mode 100644 maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java


[maven-surefire] 01/01: faster queue

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch fastqueue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit 57b2d08aa80b055f49ddcf6fcb77b4e999f563db
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Jul 11 02:59:12 2020 +0200

    faster queue
---
 .../output/ThreadedStreamConsumer.java             | 190 ++++++++++++++++-----
 .../surefire/report/ConsoleOutputFileReporter.java |  34 +---
 .../output/ThreadedStreamConsumerTest.java         | 151 ++++++++++++++++
 .../org/apache/maven/surefire/JUnit4SuiteTest.java |   2 +
 4 files changed, 308 insertions(+), 69 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 7136834..1114948 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -20,41 +20,42 @@ package org.apache.maven.plugin.surefire.booterclient.output;
  */
 
 import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 import org.apache.maven.surefire.extensions.EventHandler;
-import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
+import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 
 import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
-import static java.lang.Thread.currentThread;
+import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
 
 /**
- * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
+ * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
+ * <br>
+ * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
+ * 6.33 mega messages per second
+ * (158 nano seconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
+ * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
  *
  * @author Kristian Rosenvold
  */
 public final class ThreadedStreamConsumer
-        implements EventHandler<Event>, Closeable
+    implements EventHandler<Event>, Closeable
 {
+    private static final int QUEUE_MAX_ITEMS = 10_000;
     private static final Event END_ITEM = new FinalEvent();
 
-    private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
-
-    private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
-
+    private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
     private final AtomicBoolean stop = new AtomicBoolean();
-
-    private final Thread thread;
-
+    private final AtomicBoolean isAlive = new AtomicBoolean( true );
     private final Pumper pumper;
 
     final class Pumper
-            implements Runnable
+        implements Runnable
     {
         private final EventHandler<Event> target;
 
@@ -79,15 +80,17 @@ public final class ThreadedStreamConsumer
         @Override
         public void run()
         {
-            while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
+            while ( !stop.get() || !synchronizer.isEmptyQueue() )
             {
                 try
                 {
-                    Event item = ThreadedStreamConsumer.this.items.take();
+                    Event item = synchronizer.awaitNext();
+
                     if ( shouldStopQueueing( item ) )
                     {
-                        return;
+                        break;
                     }
+
                     target.handleEvent( item );
                 }
                 catch ( Throwable t )
@@ -95,6 +98,8 @@ public final class ThreadedStreamConsumer
                     errors.addException( t );
                 }
             }
+
+            isAlive.set( false );
         }
 
         boolean hasErrors()
@@ -111,7 +116,7 @@ public final class ThreadedStreamConsumer
     public ThreadedStreamConsumer( EventHandler<Event> target )
     {
         pumper = new Pumper( target );
-        thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
+        Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" );
         thread.start();
     }
 
@@ -122,37 +127,24 @@ public final class ThreadedStreamConsumer
         {
             return;
         }
-        else if ( !thread.isAlive() )
+        // Do NOT call Thread.isAlive() - slow.
+        // It makes worse performance from 790 millis to 1250 millis for 5 million messages.
+        else if ( !isAlive.get() )
         {
-            items.clear();
+            synchronizer.clearQueue();
             return;
         }
 
-        try
-        {
-            items.put( event );
-        }
-        catch ( InterruptedException e )
-        {
-            currentThread().interrupt();
-            throw new IllegalStateException( e );
-        }
+        synchronizer.pushNext( event );
     }
 
     @Override
     public void close()
-            throws IOException
+        throws IOException
     {
         if ( stop.compareAndSet( false, true ) )
         {
-            try
-            {
-                items.put( END_ITEM );
-            }
-            catch ( InterruptedException e )
-            {
-                currentThread().interrupt();
-            }
+            synchronizer.markStopped();
         }
 
         if ( pumper.hasErrors() )
@@ -167,7 +159,7 @@ public final class ThreadedStreamConsumer
      * @param item    element from <code>items</code>
      * @return {@code true} if tail of the queue
      */
-    private boolean shouldStopQueueing( Event item )
+    private static boolean shouldStopQueueing( Event item )
     {
         return item == END_ITEM;
     }
@@ -224,4 +216,122 @@ public final class ThreadedStreamConsumer
             return false;
         }
     }
+
+    /**
+     * This synchronization helper mostly avoids the locks.
+     * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
+     * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
+     *
+     * @param <T> element type in the queue
+     */
+    static class QueueSynchronizer<T>
+    {
+        private final SyncT1 t1 = new SyncT1();
+        private final SyncT2 t2 = new SyncT2();
+        private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
+        private final AtomicInteger queueSize = new AtomicInteger();
+        private final int maxQueueSize;
+        private final T stopItemMarker;
+
+        QueueSynchronizer( int maxQueueSize, T stopItemMarker )
+        {
+            this.maxQueueSize = maxQueueSize;
+            this.stopItemMarker = stopItemMarker;
+        }
+
+        private class SyncT1 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() == 0 ? -1 : 1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void waitIfZero() throws InterruptedException
+            {
+                acquireSharedInterruptibly( 1 );
+            }
+
+            void release()
+            {
+                releaseShared( 0 );
+            }
+        }
+
+        private class SyncT2 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() < maxQueueSize ? 1 : -1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void awaitMax()
+            {
+                acquireShared( 1 );
+            }
+
+            void tryRelease()
+            {
+                if ( queueSize.get() == 0 )
+                {
+                    releaseShared( 0 );
+                }
+            }
+        }
+
+        void markStopped()
+        {
+            addNext( stopItemMarker );
+        }
+
+        void pushNext( T t )
+        {
+            t2.awaitMax();
+            addNext( t );
+        }
+
+        T awaitNext() throws InterruptedException
+        {
+            t2.tryRelease();
+            t1.waitIfZero();
+            queueSize.decrementAndGet();
+            return queue.pollFirst();
+        }
+
+        boolean isEmptyQueue()
+        {
+            return queue.isEmpty();
+        }
+
+        void clearQueue()
+        {
+            queue.clear();
+        }
+
+        private void addNext( T t )
+        {
+            queue.addLast( t );
+            if ( queueSize.getAndIncrement() == 0 )
+            {
+                t1.release();
+            }
+        }
+    }
 }
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
index c5b8f22..729a072 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
@@ -30,7 +30,6 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.maven.plugin.surefire.report.FileReporter.getReportFile;
 import static org.apache.maven.surefire.api.util.internal.StringUtils.NL;
@@ -58,8 +57,6 @@ public class ConsoleOutputFileReporter
     private final AtomicStampedReference<FilterOutputStream> fileOutputStream =
             new AtomicStampedReference<>( null, OPEN );
 
-    private final ReentrantLock lock = new ReentrantLock();
-
     private volatile String reportEntryName;
 
     public ConsoleOutputFileReporter( File reportsDirectory, String reportNameSuffix, boolean usePhrasedFileName,
@@ -73,17 +70,9 @@ public class ConsoleOutputFileReporter
     }
 
     @Override
-    public void testSetStarting( TestSetReportEntry reportEntry )
+    public synchronized void testSetStarting( TestSetReportEntry reportEntry )
     {
-        lock.lock();
-        try
-        {
-            closeNullReportFile( reportEntry );
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeNullReportFile( reportEntry );
     }
 
     @Override
@@ -92,24 +81,15 @@ public class ConsoleOutputFileReporter
     }
 
     @Override
-    public void close()
+    public synchronized void close()
     {
         // The close() method is called in main Thread T2.
-        lock.lock();
-        try
-        {
-            closeReportFile();
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeReportFile();
     }
 
     @Override
-    public void writeTestOutput( String output, boolean newLine, boolean stdout )
+    public synchronized void writeTestOutput( String output, boolean newLine, boolean stdout )
     {
-        lock.lock();
         try
         {
             // This method is called in single thread T1 per fork JVM (see ThreadedStreamConsumer).
@@ -148,10 +128,6 @@ public class ConsoleOutputFileReporter
             // todo use UncheckedIOException in Java 8
             throw new RuntimeException( e );
         }
-        finally
-        {
-            lock.unlock();
-        }
     }
 
     @SuppressWarnings( "checkstyle:emptyblock" )
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
new file mode 100644
index 0000000..a859c76
--- /dev/null
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -0,0 +1,151 @@
+package org.apache.maven.plugin.surefire.booterclient.output;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer.QueueSynchronizer;
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+import static org.fest.assertions.Assertions.assertThat;
+
+/**
+ *
+ */
+@SuppressWarnings( "checkstyle:magicnumber" )
+public class ThreadedStreamConsumerTest
+{
+    @Test
+    public void testQueueSynchronizer() throws Exception
+    {
+        final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
+        final QueueSynchronizer<Integer> sync = new QueueSynchronizer<>(  8 * 1024, null );
+
+        Thread t = new Thread()
+        {
+            @Override
+            public void run()
+            {
+                while ( true )
+                {
+                    try
+                    {
+                        sync.awaitNext();
+                        countDown.countDown();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        throw new IllegalStateException( e );
+                    }
+                }
+            }
+        };
+        t.setDaemon( true );
+        t.start();
+
+        SECONDS.sleep( 1 );
+        System.gc();
+        SECONDS.sleep( 2 );
+
+        long t1 = System.currentTimeMillis();
+
+        for ( int i = 0; i < 5_000_000; i++ )
+        {
+            sync.pushNext( i );
+        }
+
+        assertThat( countDown.await( 3L, SECONDS ) )
+            .isTrue();
+
+        long t2 = System.currentTimeMillis();
+        System.out.println( ( t2 - t1 ) + " millis in testQueueSynchronizer()" );
+    }
+
+    @Test
+    public void testThreadedStreamConsumer() throws Exception
+    {
+        final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
+        EventHandler<Event> handler = new EventHandler<Event>()
+        {
+            @Override
+            public void handleEvent( @Nonnull Event event )
+            {
+                countDown.countDown();
+            }
+        };
+
+        ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler );
+
+        SECONDS.sleep( 1 );
+        System.gc();
+        SECONDS.sleep( 2 );
+
+        long t1 = System.currentTimeMillis();
+
+        Event event = new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" );
+        for ( int i = 0; i < 5_000_000; i++ )
+        {
+            streamConsumer.handleEvent( event );
+        }
+
+        assertThat( countDown.await( 3L, SECONDS ) )
+            .isTrue();
+
+        long t2 = System.currentTimeMillis();
+        System.out.println( ( t2 - t1 ) + " millis in testThreadedStreamConsumer()" );
+
+        streamConsumer.close();
+    }
+
+    @Test
+    public void test3() throws Exception
+    {
+        final QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2, null );
+        sync.pushNext( "1" );
+        sync.pushNext( "2" );
+        String s1 = sync.awaitNext();
+        String s2 = sync.awaitNext();
+        assertThat( s1 ).isEqualTo( "1" );
+        assertThat( s2 ).isEqualTo( "2" );
+        FutureTask<Void> future = new FutureTask<>( new Callable<Void>()
+        {
+            @Override
+            public Void call() throws Exception
+            {
+                sync.awaitNext();
+                return null;
+            }
+        } );
+        Thread t = new Thread( future );
+        t.setDaemon( true );
+        t.start();
+        SECONDS.sleep( 3L );
+        assertThat( t.getState() )
+            .isEqualTo( Thread.State.WAITING );
+    }
+}
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
index 36425ed..9770f8a 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
@@ -41,6 +41,7 @@ import org.apache.maven.plugin.surefire.booterclient.ModularClasspathForkConfigu
 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStreamBuilderTest;
 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestProvidingInputStreamTest;
 import org.apache.maven.plugin.surefire.booterclient.output.ForkClientTest;
+import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumerTest;
 import org.apache.maven.plugin.surefire.extensions.ConsoleOutputReporterTest;
 import org.apache.maven.plugin.surefire.extensions.E2ETest;
 import org.apache.maven.plugin.surefire.extensions.ForkedProcessEventNotifierTest;
@@ -112,6 +113,7 @@ public class JUnit4SuiteTest extends TestCase
         suite.addTest( new JUnit4TestAdapter( ForkChannelTest.class ) );
         suite.addTest( new JUnit4TestAdapter( StreamFeederTest.class ) );
         suite.addTest( new JUnit4TestAdapter( E2ETest.class ) );
+        suite.addTest( new JUnit4TestAdapter( ThreadedStreamConsumerTest.class ) );
         return suite;
     }
 }