You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2020/07/14 00:22:42 UTC
[maven-surefire] branch faster-queue updated: custom
QueueSynchronizer instead of ReentrantLock+Semaphore+CyclicBarrier
This is an automated email from the ASF dual-hosted git repository.
tibordigana pushed a commit to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
The following commit(s) were added to refs/heads/faster-queue by this push:
new 78339d3 custom QueueSynchronizer instead of ReentrantLock+Semaphore+CyclicBarrier
78339d3 is described below
commit 78339d3c7ff19bdb29d8761b720d88d65fd840ba
Author: tibordigana <ti...@apache.org>
AuthorDate: Tue Jul 14 02:22:31 2020 +0200
custom QueueSynchronizer instead of ReentrantLock+Semaphore+CyclicBarrier
---
.../output/ThreadedStreamConsumer.java | 50 ++++----
.../output/ThreadedStreamConsumerTest.java | 129 +++++++++++++++++++--
2 files changed, 147 insertions(+), 32 deletions(-)
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 34d3a31..5e1968f 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -28,40 +28,34 @@ import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
/**
* Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
*
* @author Kristian Rosenvold
*/
public final class ThreadedStreamConsumer
- implements EventHandler<Event>, Closeable
+ implements EventHandler<Event>, Closeable
{
private static final int QUEUE_MAX_ITEMS = 10_000;
private static final Event END_ITEM = new FinalEvent();
private final ConcurrentLinkedDeque<Event> queue = new ConcurrentLinkedDeque<>();
-
private final ReentrantLock queueLock = new ReentrantLock();
-
private final Condition queueCondition = queueLock.newCondition();
-
+ private final Semaphore producerBarrier = new Semaphore( 0 );
private final AtomicInteger queueSize = new AtomicInteger();
-
private final AtomicBoolean stop = new AtomicBoolean();
-
private final Thread thread;
-
private final Pumper pumper;
final class Pumper
- implements Runnable
+ implements Runnable
{
private final EventHandler<Event> target;
@@ -86,9 +80,10 @@ public final class ThreadedStreamConsumer
@Override
public void run()
{
+ queueLock.lock();
+ //todo CyclicBarrier here
try
{
- queueLock.lock();
while ( !stop.get() || !queue.isEmpty() )
{
try
@@ -97,7 +92,8 @@ public final class ThreadedStreamConsumer
if ( item == null )
{
- queueCondition.await( 1L, SECONDS );
+ queueCondition.await();
+ producerBarrier.release();
continue;
}
else
@@ -145,6 +141,7 @@ public final class ThreadedStreamConsumer
@Override
public void handleEvent( @Nonnull Event event )
{
+ //todo CyclicBarrier here
if ( stop.get() )
{
return;
@@ -156,17 +153,27 @@ public final class ThreadedStreamConsumer
}
int count = queueSize.get();
- if ( count == 0 || count >= QUEUE_MAX_ITEMS )
+ boolean min = count == 0;
+ boolean max = count >= QUEUE_MAX_ITEMS;
+ if ( min || max )
{
+ queueLock.lock();
try
{
- queueLock.lock();
- updateAndNotifyReader( event );
+ queueSize.incrementAndGet();
+ queue.addLast( event );
+ producerBarrier.drainPermits();
+ queueCondition.signal();
}
finally
{
queueLock.unlock();
}
+
+ if ( max )
+ {
+ producerBarrier.acquireUninterruptibly();
+ }
}
else
{
@@ -175,29 +182,24 @@ public final class ThreadedStreamConsumer
}
}
- private void updateAndNotifyReader( @Nonnull Event event )
- {
- queueSize.incrementAndGet();
- queue.addLast( event );
- queueCondition.signal();
- }
-
@Override
public void close()
- throws IOException
+ throws IOException
{
if ( stop.compareAndSet( false, true ) )
{
queue.addLast( END_ITEM );
+ queueLock.lock();
try
{
- queueLock.lock();
queueCondition.signal();
}
finally
{
queueLock.unlock();
}
+
+ producerBarrier.release( 2 );
}
if ( pumper.hasErrors() )
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
index 54491a4..83b6c50 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -22,13 +22,18 @@ package org.apache.maven.plugin.surefire.booterclient.output;
import org.apache.maven.surefire.api.event.Event;
import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Ignore;
import org.junit.Test;
import javax.annotation.Nonnull;
-
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+import static org.fest.assertions.Assertions.assertThat;
/**
*
@@ -37,38 +42,146 @@ import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
public class ThreadedStreamConsumerTest
{
@Test
+ @Ignore
public void test() throws Exception
{
+ final CountDownLatch countDown = new CountDownLatch( 100_000 );
EventHandler<Event> handler = new EventHandler<Event>()
{
- private int i;
+ private final AtomicInteger i = new AtomicInteger();
@Override
public void handleEvent( @Nonnull Event event )
{
+ //System.out.println(i.get());
+ countDown.countDown();
try
{
- System.out.println( Thread.currentThread() );
- if ( i++ % 5000 == 0 )
+ if ( i.incrementAndGet() % 11_000 == 0 )
{
- TimeUnit.MILLISECONDS.sleep( 500L );
+ TimeUnit.MILLISECONDS.sleep( 10L );
}
}
catch ( InterruptedException e )
{
- e.printStackTrace();
+ throw new IllegalStateException( e );
}
}
};
ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler );
- for ( int i = 0; i < 11_000; i++ )
+ for ( int i = 0; i < 100_000; i++ )
{
- System.out.println( i );
streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" ) );
}
+ assertThat( countDown.await( 10L, TimeUnit.MINUTES ) ).isTrue();
+
streamConsumer.close();
}
+
+ @Test
+ public void test3() throws Exception
+ {
+ QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2 );
+ sync.pushNext( "1" );
+ sync.pushNext( "2" );
+ //sync.pushNext( "3" );
+ String s1 = sync.awaitNext();
+ String s2 = sync.awaitNext();
+ //String s3 = sync.awaitNext();
+ }
+
+ static class QueueSynchronizer<T>
+ {
+ private final AtomicInteger queueSize = new AtomicInteger();
+
+ QueueSynchronizer( int max )
+ {
+ this.max = max;
+ }
+
+ private class SyncT1 extends AbstractQueuedSynchronizer
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected int tryAcquireShared( int arg )
+ {
+ return queueSize.get() == 0 ? -1 : 1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared( int arg )
+ {
+ return true;
+ }
+
+ void waitIfZero() throws InterruptedException
+ {
+ acquireSharedInterruptibly( 1 );
+ }
+
+ void release()
+ {
+ releaseShared( 0 );
+ }
+ }
+
+ private class SyncT2 extends AbstractQueuedSynchronizer
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected int tryAcquireShared( int arg )
+ {
+ return queueSize.get() < max ? 1 : -1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared( int arg )
+ {
+ return true;
+ }
+
+ void awaitMax()
+ {
+ acquireShared( 1 );
+ }
+
+ void tryRelease()
+ {
+ if ( queueSize.get() == 0 )
+ {
+ releaseShared( 0 );
+ }
+ }
+ }
+
+ private final SyncT1 t1 = new SyncT1();
+ private final SyncT2 t2 = new SyncT2();
+ private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
+ private final int max;
+
+ void pushNext( T t )
+ {
+ queue.addLast( t );
+ int previousCount = queueSize.get();
+ t2.awaitMax();
+ queueSize.incrementAndGet();
+ if ( previousCount == 0 )
+ {
+ t1.release();
+ }
+ }
+
+ T awaitNext() throws InterruptedException
+ {
+ t2.tryRelease();
+ t1.waitIfZero();
+ queueSize.decrementAndGet();
+ return queue.pollFirst();
+ }
+ }
}