You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2020/07/11 01:25:12 UTC

[maven-surefire] 01/02: faster

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit a07c028a7752a2db56c564c49dd3b9ff470b6d13
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Jul 11 02:59:12 2020 +0200

    faster
---
 .../output/ThreadedStreamConsumer.java             | 98 ++++++++++++++++------
 .../surefire/report/ConsoleOutputFileReporter.java | 34 ++------
 .../output/ThreadedStreamConsumerTest.java         | 74 ++++++++++++++++
 3 files changed, 151 insertions(+), 55 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 7136834..446203c 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -20,18 +20,20 @@ package org.apache.maven.plugin.surefire.booterclient.output;
  */
 
 import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
-import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 
 import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
-import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
  * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
@@ -41,11 +43,16 @@ import static java.lang.Thread.currentThread;
 public final class ThreadedStreamConsumer
         implements EventHandler<Event>, Closeable
 {
+    private static final int QUEUE_MAX_ITEMS = 10_000;
     private static final Event END_ITEM = new FinalEvent();
 
-    private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
+    private final ConcurrentLinkedDeque<Event> queue = new ConcurrentLinkedDeque<>();
+
+    private final ReentrantLock queueLock = new ReentrantLock();
+
+    private final Condition queueCondition = queueLock.newCondition();
 
-    private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
+    private final AtomicInteger queueSize = new AtomicInteger();
 
     private final AtomicBoolean stop = new AtomicBoolean();
 
@@ -79,22 +86,42 @@ public final class ThreadedStreamConsumer
         @Override
         public void run()
         {
-            while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
+            try
             {
-                try
+                queueLock.lock();
+                while ( !stop.get() || !queue.isEmpty() )
                 {
-                    Event item = ThreadedStreamConsumer.this.items.take();
-                    if ( shouldStopQueueing( item ) )
+                    try
                     {
-                        return;
+                        Event item = queue.pollFirst();
+
+                        if ( item == null )
+                        {
+                            queueCondition.await( 1L, SECONDS );
+                            continue;
+                        }
+                        else
+                        {
+                            queueSize.decrementAndGet();
+                        }
+
+                        if ( shouldStopQueueing( item ) )
+                        {
+                            break;
+                        }
+
+                        target.handleEvent( item );
+                    }
+                    catch ( Throwable t )
+                    {
+                        errors.addException( t );
                     }
-                    target.handleEvent( item );
-                }
-                catch ( Throwable t )
-                {
-                    errors.addException( t );
                 }
             }
+            finally
+            {
+                queueLock.unlock();
+            }
         }
 
         boolean hasErrors()
@@ -124,34 +151,53 @@ public final class ThreadedStreamConsumer
         }
         else if ( !thread.isAlive() )
         {
-            items.clear();
+            queue.clear();
             return;
         }
 
-        try
+        int count = queueSize.get();
+        // System.out.println( "count = " + count );
+        if ( count == 0 || count >= QUEUE_MAX_ITEMS )
         {
-            items.put( event );
+            try
+            {
+                queueLock.lock();
+                updateAndNotifyReader( event );
+            }
+            finally
+            {
+                queueLock.unlock();
+            }
         }
-        catch ( InterruptedException e )
+        else
         {
-            currentThread().interrupt();
-            throw new IllegalStateException( e );
+            queueSize.incrementAndGet();
+            queue.addLast( event );
         }
     }
 
+    private void updateAndNotifyReader( @Nonnull Event event )
+    {
+        queueSize.incrementAndGet();
+        queue.addLast( event );
+        queueCondition.signal();
+    }
+
     @Override
     public void close()
             throws IOException
     {
         if ( stop.compareAndSet( false, true ) )
         {
+            queue.addLast( END_ITEM );
             try
             {
-                items.put( END_ITEM );
+                queueLock.lock();
+                queueCondition.signal();
             }
-            catch ( InterruptedException e )
+            finally
             {
-                currentThread().interrupt();
+                queueLock.unlock();
             }
         }
 
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
index c5b8f22..729a072 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
@@ -30,7 +30,6 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.maven.plugin.surefire.report.FileReporter.getReportFile;
 import static org.apache.maven.surefire.api.util.internal.StringUtils.NL;
@@ -58,8 +57,6 @@ public class ConsoleOutputFileReporter
     private final AtomicStampedReference<FilterOutputStream> fileOutputStream =
             new AtomicStampedReference<>( null, OPEN );
 
-    private final ReentrantLock lock = new ReentrantLock();
-
     private volatile String reportEntryName;
 
     public ConsoleOutputFileReporter( File reportsDirectory, String reportNameSuffix, boolean usePhrasedFileName,
@@ -73,17 +70,9 @@ public class ConsoleOutputFileReporter
     }
 
     @Override
-    public void testSetStarting( TestSetReportEntry reportEntry )
+    public synchronized void testSetStarting( TestSetReportEntry reportEntry )
     {
-        lock.lock();
-        try
-        {
-            closeNullReportFile( reportEntry );
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeNullReportFile( reportEntry );
     }
 
     @Override
@@ -92,24 +81,15 @@ public class ConsoleOutputFileReporter
     }
 
     @Override
-    public void close()
+    public synchronized void close()
     {
         // The close() method is called in main Thread T2.
-        lock.lock();
-        try
-        {
-            closeReportFile();
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeReportFile();
     }
 
     @Override
-    public void writeTestOutput( String output, boolean newLine, boolean stdout )
+    public synchronized void writeTestOutput( String output, boolean newLine, boolean stdout )
     {
-        lock.lock();
         try
         {
             // This method is called in single thread T1 per fork JVM (see ThreadedStreamConsumer).
@@ -148,10 +128,6 @@ public class ConsoleOutputFileReporter
             // todo use UncheckedIOException in Java 8
             throw new RuntimeException( e );
         }
-        finally
-        {
-            lock.unlock();
-        }
     }
 
     @SuppressWarnings( "checkstyle:emptyblock" )
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
new file mode 100644
index 0000000..54491a4
--- /dev/null
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -0,0 +1,74 @@
+package org.apache.maven.plugin.surefire.booterclient.output;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+
+/**
+ *
+ */
+@SuppressWarnings( "checkstyle:magicnumber" )
+public class ThreadedStreamConsumerTest
+{
+    @Test
+    public void test() throws Exception
+    {
+        EventHandler<Event> handler = new EventHandler<Event>()
+        {
+            private int i;
+
+            @Override
+            public void handleEvent( @Nonnull Event event )
+            {
+                try
+                {
+                    System.out.println( Thread.currentThread() );
+                    if ( i++ % 5000 == 0 )
+                    {
+                        TimeUnit.MILLISECONDS.sleep( 500L );
+                    }
+                }
+                catch ( InterruptedException e )
+                {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler );
+
+        for ( int i = 0; i < 11_000; i++ )
+        {
+            System.out.println( i );
+            streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" ) );
+        }
+
+        streamConsumer.close();
+    }
+}