You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2020/07/11 01:25:12 UTC
[maven-surefire] 01/02: faster
This is an automated email from the ASF dual-hosted git repository.
tibordigana pushed a commit to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
commit a07c028a7752a2db56c564c49dd3b9ff470b6d13
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Jul 11 02:59:12 2020 +0200
faster
---
.../output/ThreadedStreamConsumer.java | 98 ++++++++++++++++------
.../surefire/report/ConsoleOutputFileReporter.java | 34 ++------
.../output/ThreadedStreamConsumerTest.java | 74 ++++++++++++++++
3 files changed, 151 insertions(+), 55 deletions(-)
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 7136834..446203c 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -20,18 +20,20 @@ package org.apache.maven.plugin.surefire.booterclient.output;
*/
import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
-import org.apache.maven.surefire.extensions.EventHandler;
import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
-import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
@@ -41,11 +43,16 @@ import static java.lang.Thread.currentThread;
public final class ThreadedStreamConsumer
implements EventHandler<Event>, Closeable
{
+ private static final int QUEUE_MAX_ITEMS = 10_000;
private static final Event END_ITEM = new FinalEvent();
- private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
+ private final ConcurrentLinkedDeque<Event> queue = new ConcurrentLinkedDeque<>();
+
+ private final ReentrantLock queueLock = new ReentrantLock();
+
+ private final Condition queueCondition = queueLock.newCondition();
- private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
+ private final AtomicInteger queueSize = new AtomicInteger();
private final AtomicBoolean stop = new AtomicBoolean();
@@ -79,22 +86,42 @@ public final class ThreadedStreamConsumer
@Override
public void run()
{
- while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
+ try
{
- try
+ queueLock.lock();
+ while ( !stop.get() || !queue.isEmpty() )
{
- Event item = ThreadedStreamConsumer.this.items.take();
- if ( shouldStopQueueing( item ) )
+ try
{
- return;
+ Event item = queue.pollFirst();
+
+ if ( item == null )
+ {
+ queueCondition.await( 1L, SECONDS );
+ continue;
+ }
+ else
+ {
+ queueSize.decrementAndGet();
+ }
+
+ if ( shouldStopQueueing( item ) )
+ {
+ break;
+ }
+
+ target.handleEvent( item );
+ }
+ catch ( Throwable t )
+ {
+ errors.addException( t );
}
- target.handleEvent( item );
- }
- catch ( Throwable t )
- {
- errors.addException( t );
}
}
+ finally
+ {
+ queueLock.unlock();
+ }
}
boolean hasErrors()
@@ -124,34 +151,53 @@ public final class ThreadedStreamConsumer
}
else if ( !thread.isAlive() )
{
- items.clear();
+ queue.clear();
return;
}
- try
+ int count = queueSize.get();
+ // System.out.println( "count = " + count );
+ if ( count == 0 || count >= QUEUE_MAX_ITEMS )
{
- items.put( event );
+ try
+ {
+ queueLock.lock();
+ updateAndNotifyReader( event );
+ }
+ finally
+ {
+ queueLock.unlock();
+ }
}
- catch ( InterruptedException e )
+ else
{
- currentThread().interrupt();
- throw new IllegalStateException( e );
+ queueSize.incrementAndGet();
+ queue.addLast( event );
}
}
+ private void updateAndNotifyReader( @Nonnull Event event )
+ {
+ queueSize.incrementAndGet();
+ queue.addLast( event );
+ queueCondition.signal();
+ }
+
@Override
public void close()
throws IOException
{
if ( stop.compareAndSet( false, true ) )
{
+ queue.addLast( END_ITEM );
try
{
- items.put( END_ITEM );
+ queueLock.lock();
+ queueCondition.signal();
}
- catch ( InterruptedException e )
+ finally
{
- currentThread().interrupt();
+ queueLock.unlock();
}
}
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
index c5b8f22..729a072 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
@@ -30,7 +30,6 @@ import java.io.FilterOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
import static org.apache.maven.plugin.surefire.report.FileReporter.getReportFile;
import static org.apache.maven.surefire.api.util.internal.StringUtils.NL;
@@ -58,8 +57,6 @@ public class ConsoleOutputFileReporter
private final AtomicStampedReference<FilterOutputStream> fileOutputStream =
new AtomicStampedReference<>( null, OPEN );
- private final ReentrantLock lock = new ReentrantLock();
-
private volatile String reportEntryName;
public ConsoleOutputFileReporter( File reportsDirectory, String reportNameSuffix, boolean usePhrasedFileName,
@@ -73,17 +70,9 @@ public class ConsoleOutputFileReporter
}
@Override
- public void testSetStarting( TestSetReportEntry reportEntry )
+ public synchronized void testSetStarting( TestSetReportEntry reportEntry )
{
- lock.lock();
- try
- {
- closeNullReportFile( reportEntry );
- }
- finally
- {
- lock.unlock();
- }
+ closeNullReportFile( reportEntry );
}
@Override
@@ -92,24 +81,15 @@ public class ConsoleOutputFileReporter
}
@Override
- public void close()
+ public synchronized void close()
{
// The close() method is called in main Thread T2.
- lock.lock();
- try
- {
- closeReportFile();
- }
- finally
- {
- lock.unlock();
- }
+ closeReportFile();
}
@Override
- public void writeTestOutput( String output, boolean newLine, boolean stdout )
+ public synchronized void writeTestOutput( String output, boolean newLine, boolean stdout )
{
- lock.lock();
try
{
// This method is called in single thread T1 per fork JVM (see ThreadedStreamConsumer).
@@ -148,10 +128,6 @@ public class ConsoleOutputFileReporter
// todo use UncheckedIOException in Java 8
throw new RuntimeException( e );
}
- finally
- {
- lock.unlock();
- }
}
@SuppressWarnings( "checkstyle:emptyblock" )
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
new file mode 100644
index 0000000..54491a4
--- /dev/null
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -0,0 +1,74 @@
+package org.apache.maven.plugin.surefire.booterclient.output;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+
+/**
+ *
+ */
+@SuppressWarnings( "checkstyle:magicnumber" )
+public class ThreadedStreamConsumerTest
+{
+ @Test
+ public void test() throws Exception
+ {
+ EventHandler<Event> handler = new EventHandler<Event>()
+ {
+ private int i;
+
+ @Override
+ public void handleEvent( @Nonnull Event event )
+ {
+ try
+ {
+ System.out.println( Thread.currentThread() );
+ if ( i++ % 5000 == 0 )
+ {
+ TimeUnit.MILLISECONDS.sleep( 500L );
+ }
+ }
+ catch ( InterruptedException e )
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler );
+
+ for ( int i = 0; i < 11_000; i++ )
+ {
+ System.out.println( i );
+ streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" ) );
+ }
+
+ streamConsumer.close();
+ }
+}