You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2020/07/11 01:25:11 UTC

[maven-surefire] branch faster-queue created (now d27a529)

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a change to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git.


      at d27a529  faster ThreadedStreamConsumer

This branch includes the following new commits:

     new a07c028  faster
     new d27a529  faster ThreadedStreamConsumer

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[maven-surefire] 01/02: faster

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit a07c028a7752a2db56c564c49dd3b9ff470b6d13
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Jul 11 02:59:12 2020 +0200

    faster
---
 .../output/ThreadedStreamConsumer.java             | 98 ++++++++++++++++------
 .../surefire/report/ConsoleOutputFileReporter.java | 34 ++------
 .../output/ThreadedStreamConsumerTest.java         | 74 ++++++++++++++++
 3 files changed, 151 insertions(+), 55 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 7136834..446203c 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -20,18 +20,20 @@ package org.apache.maven.plugin.surefire.booterclient.output;
  */
 
 import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
-import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 
 import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
-import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
  * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
@@ -41,11 +43,16 @@ import static java.lang.Thread.currentThread;
 public final class ThreadedStreamConsumer
         implements EventHandler<Event>, Closeable
 {
+    private static final int QUEUE_MAX_ITEMS = 10_000;
     private static final Event END_ITEM = new FinalEvent();
 
-    private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
+    private final ConcurrentLinkedDeque<Event> queue = new ConcurrentLinkedDeque<>();
+
+    private final ReentrantLock queueLock = new ReentrantLock();
+
+    private final Condition queueCondition = queueLock.newCondition();
 
-    private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
+    private final AtomicInteger queueSize = new AtomicInteger();
 
     private final AtomicBoolean stop = new AtomicBoolean();
 
@@ -79,22 +86,42 @@ public final class ThreadedStreamConsumer
         @Override
         public void run()
         {
-            while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
+            try
             {
-                try
+                queueLock.lock();
+                while ( !stop.get() || !queue.isEmpty() )
                 {
-                    Event item = ThreadedStreamConsumer.this.items.take();
-                    if ( shouldStopQueueing( item ) )
+                    try
                     {
-                        return;
+                        Event item = queue.pollFirst();
+
+                        if ( item == null )
+                        {
+                            queueCondition.await( 1L, SECONDS );
+                            continue;
+                        }
+                        else
+                        {
+                            queueSize.decrementAndGet();
+                        }
+
+                        if ( shouldStopQueueing( item ) )
+                        {
+                            break;
+                        }
+
+                        target.handleEvent( item );
+                    }
+                    catch ( Throwable t )
+                    {
+                        errors.addException( t );
                     }
-                    target.handleEvent( item );
-                }
-                catch ( Throwable t )
-                {
-                    errors.addException( t );
                 }
             }
+            finally
+            {
+                queueLock.unlock();
+            }
         }
 
         boolean hasErrors()
@@ -124,34 +151,53 @@ public final class ThreadedStreamConsumer
         }
         else if ( !thread.isAlive() )
         {
-            items.clear();
+            queue.clear();
             return;
         }
 
-        try
+        int count = queueSize.get();
+        // System.out.println( "count = " + count );
+        if ( count == 0 || count >= QUEUE_MAX_ITEMS )
         {
-            items.put( event );
+            try
+            {
+                queueLock.lock();
+                updateAndNotifyReader( event );
+            }
+            finally
+            {
+                queueLock.unlock();
+            }
         }
-        catch ( InterruptedException e )
+        else
         {
-            currentThread().interrupt();
-            throw new IllegalStateException( e );
+            queueSize.incrementAndGet();
+            queue.addLast( event );
         }
     }
 
+    private void updateAndNotifyReader( @Nonnull Event event )
+    {
+        queueSize.incrementAndGet();
+        queue.addLast( event );
+        queueCondition.signal();
+    }
+
     @Override
     public void close()
             throws IOException
     {
         if ( stop.compareAndSet( false, true ) )
         {
+            queue.addLast( END_ITEM );
             try
             {
-                items.put( END_ITEM );
+                queueLock.lock();
+                queueCondition.signal();
             }
-            catch ( InterruptedException e )
+            finally
             {
-                currentThread().interrupt();
+                queueLock.unlock();
             }
         }
 
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
index c5b8f22..729a072 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
@@ -30,7 +30,6 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.maven.plugin.surefire.report.FileReporter.getReportFile;
 import static org.apache.maven.surefire.api.util.internal.StringUtils.NL;
@@ -58,8 +57,6 @@ public class ConsoleOutputFileReporter
     private final AtomicStampedReference<FilterOutputStream> fileOutputStream =
             new AtomicStampedReference<>( null, OPEN );
 
-    private final ReentrantLock lock = new ReentrantLock();
-
     private volatile String reportEntryName;
 
     public ConsoleOutputFileReporter( File reportsDirectory, String reportNameSuffix, boolean usePhrasedFileName,
@@ -73,17 +70,9 @@ public class ConsoleOutputFileReporter
     }
 
     @Override
-    public void testSetStarting( TestSetReportEntry reportEntry )
+    public synchronized void testSetStarting( TestSetReportEntry reportEntry )
     {
-        lock.lock();
-        try
-        {
-            closeNullReportFile( reportEntry );
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeNullReportFile( reportEntry );
     }
 
     @Override
@@ -92,24 +81,15 @@ public class ConsoleOutputFileReporter
     }
 
     @Override
-    public void close()
+    public synchronized void close()
     {
         // The close() method is called in main Thread T2.
-        lock.lock();
-        try
-        {
-            closeReportFile();
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeReportFile();
     }
 
     @Override
-    public void writeTestOutput( String output, boolean newLine, boolean stdout )
+    public synchronized void writeTestOutput( String output, boolean newLine, boolean stdout )
     {
-        lock.lock();
         try
         {
             // This method is called in single thread T1 per fork JVM (see ThreadedStreamConsumer).
@@ -148,10 +128,6 @@ public class ConsoleOutputFileReporter
             // todo use UncheckedIOException in Java 8
             throw new RuntimeException( e );
         }
-        finally
-        {
-            lock.unlock();
-        }
     }
 
     @SuppressWarnings( "checkstyle:emptyblock" )
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
new file mode 100644
index 0000000..54491a4
--- /dev/null
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -0,0 +1,74 @@
+package org.apache.maven.plugin.surefire.booterclient.output;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+
+/**
+ *
+ */
+@SuppressWarnings( "checkstyle:magicnumber" )
+public class ThreadedStreamConsumerTest
+{
+    @Test
+    public void test() throws Exception
+    {
+        EventHandler<Event> handler = new EventHandler<Event>()
+        {
+            private int i;
+
+            @Override
+            public void handleEvent( @Nonnull Event event )
+            {
+                try
+                {
+                    System.out.println( Thread.currentThread() );
+                    if ( i++ % 5000 == 0 )
+                    {
+                        TimeUnit.MILLISECONDS.sleep( 500L );
+                    }
+                }
+                catch ( InterruptedException e )
+                {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler );
+
+        for ( int i = 0; i < 11_000; i++ )
+        {
+            System.out.println( i );
+            streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" ) );
+        }
+
+        streamConsumer.close();
+    }
+}


[maven-surefire] 02/02: faster ThreadedStreamConsumer

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit d27a529a9d409bf387fb461ba3d101aede1dd873
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Jul 11 03:25:01 2020 +0200

    faster ThreadedStreamConsumer
---
 .../plugin/surefire/booterclient/output/ThreadedStreamConsumer.java      | 1 -
 1 file changed, 1 deletion(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 446203c..34d3a31 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -156,7 +156,6 @@ public final class ThreadedStreamConsumer
         }
 
         int count = queueSize.get();
-        // System.out.println( "count = " + count );
         if ( count == 0 || count >= QUEUE_MAX_ITEMS )
         {
             try