You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2021/04/05 11:54:05 UTC
[maven-surefire] 01/01: dry pipes and guarantee that tcp server is
on acceptance before JAR execution
This is an automated email from the ASF dual-hosted git repository.
tibordigana pushed a commit to branch dry-pipes-tcp-guarantees
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
commit 64bb1471c02d7d387fe2c7f64a52e77d45dd67df
Author: Tibor Digana <ti...@gmail.com>
AuthorDate: Mon Apr 5 13:53:44 2021 +0200
dry pipes and guarantee that tcp server is on acceptance before JAR execution
---
.../plugin/surefire/booterclient/ForkStarter.java | 30 ++-
.../surefire/extensions/LegacyForkChannel.java | 42 +++-
.../surefire/extensions/SurefireForkChannel.java | 239 +++++++++++++++------
.../maven/plugin/surefire/extensions/E2ETest.java | 3 +-
.../maven/surefire/extensions/ForkChannelTest.java | 4 +-
...refireMasterProcessChannelProcessorFactory.java | 2 +-
.../surefire/extensions/CloseableDaemonThread.java | 5 +-
...CloseableDaemonThread.java => Completable.java} | 23 +-
.../maven/surefire/extensions/ForkChannel.java | 18 +-
.../{CloseableDaemonThread.java => Stoppable.java} | 13 +-
.../CountDownLauncher.java} | 34 ++-
11 files changed, 280 insertions(+), 133 deletions(-)
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
index 200d2a8..2cad2a3 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
@@ -41,11 +41,12 @@ import org.apache.maven.surefire.api.booter.Shutdown;
import org.apache.maven.surefire.booter.StartupConfiguration;
import org.apache.maven.surefire.booter.SurefireBooterForkException;
import org.apache.maven.surefire.booter.SurefireExecutionException;
-import org.apache.maven.surefire.extensions.CloseableDaemonThread;
+import org.apache.maven.surefire.extensions.Completable;
import org.apache.maven.surefire.extensions.EventHandler;
import org.apache.maven.surefire.extensions.ForkChannel;
import org.apache.maven.surefire.extensions.ForkNodeFactory;
import org.apache.maven.surefire.api.fork.ForkNodeArguments;
+import org.apache.maven.surefire.extensions.Stoppable;
import org.apache.maven.surefire.extensions.util.CommandlineExecutor;
import org.apache.maven.surefire.extensions.util.CommandlineStreams;
import org.apache.maven.surefire.extensions.util.CountdownCloseable;
@@ -628,31 +629,29 @@ public class ForkStarter
Integer result = null;
RunResult runResult = null;
SurefireBooterForkException booterForkException = null;
- CloseableDaemonThread in = null;
- CloseableDaemonThread out = null;
- CloseableDaemonThread err = null;
+ Stoppable err = null;
DefaultReporterFactory reporter = forkClient.getDefaultReporterFactory();
currentForkClients.add( forkClient );
CountdownCloseable countdownCloseable =
new CountdownCloseable( eventConsumer, forkChannel.getCountdownCloseablePermits() );
try ( CommandlineExecutor exec = new CommandlineExecutor( cli, countdownCloseable ) )
{
+ Completable client = forkChannel.connectToClient();
CommandlineStreams streams = exec.execute();
closer.addCloseable( streams );
- forkChannel.connectToClient();
- log.debug( "Fork Channel [" + forkNumber + "] connected to the client." );
-
- in = forkChannel.bindCommandReader( commandReader, streams.getStdInChannel() );
- in.start();
+ forkChannel.bindCommandReader( commandReader, streams.getStdInChannel() );
- out = forkChannel.bindEventHandler( eventConsumer, countdownCloseable, streams.getStdOutChannel() );
- out.start();
+ forkChannel.bindEventHandler( eventConsumer, countdownCloseable, streams.getStdOutChannel() );
EventHandler<String> errConsumer = new NativeStdErrStreamConsumer( log );
- err = new LineConsumerThread( "fork-" + forkNumber + "-err-thread", streams.getStdErrChannel(),
- errConsumer, countdownCloseable );
- err.start();
+ LineConsumerThread stdErr = new LineConsumerThread( "fork-" + forkNumber + "-err-thread",
+ streams.getStdErrChannel(), errConsumer, countdownCloseable );
+ err = stdErr;
+ stdErr.start();
+
+ client.complete();
+ log.debug( "Fork Channel [" + forkNumber + "] connected to the client." );
result = exec.awaitExit();
@@ -670,8 +669,7 @@ public class ForkStarter
{
log.error( "Closing the streams after (InterruptedException) '" + e.getLocalizedMessage() + "'" );
// maybe implement it in the Future.cancel() of the extension or similar
- in.disable();
- out.disable();
+ forkChannel.disable();
err.disable();
}
catch ( Exception e )
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
index e18f77d..226adaa 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
@@ -22,6 +22,7 @@ package org.apache.maven.plugin.surefire.extensions;
import org.apache.maven.surefire.api.event.Event;
import org.apache.maven.surefire.extensions.CloseableDaemonThread;
import org.apache.maven.surefire.extensions.CommandReader;
+import org.apache.maven.surefire.extensions.Completable;
import org.apache.maven.surefire.extensions.EventHandler;
import org.apache.maven.surefire.extensions.ForkChannel;
import org.apache.maven.surefire.api.fork.ForkNodeArguments;
@@ -41,14 +42,18 @@ import java.nio.channels.WritableByteChannel;
*/
final class LegacyForkChannel extends ForkChannel
{
+ private CloseableDaemonThread commandReaderBindings;
+ private CloseableDaemonThread eventHandlerBindings;
+
LegacyForkChannel( @Nonnull ForkNodeArguments arguments )
{
super( arguments );
}
@Override
- public void connectToClient()
+ public Completable connectToClient()
{
+ return Completable.EMPTY_COMPLETABLE;
}
@Override
@@ -64,20 +69,37 @@ final class LegacyForkChannel extends ForkChannel
}
@Override
- public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
- WritableByteChannel stdIn )
+ public void bindCommandReader( @Nonnull CommandReader commands, WritableByteChannel stdIn )
+ {
+ ForkNodeArguments args = getArguments();
+ String threadName = "commands-fork-" + args.getForkChannelId();
+ commandReaderBindings = new StreamFeeder( threadName, stdIn, commands, args.getConsoleLogger() );
+ commandReaderBindings.start();
+ }
+
+ @Override
+ public void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
+ @Nonnull CountdownCloseable countdownCloseable,
+ ReadableByteChannel stdOut )
{
- return new StreamFeeder( "std-in-fork-" + getArguments().getForkChannelId(), stdIn, commands,
- getArguments().getConsoleLogger() );
+ ForkNodeArguments args = getArguments();
+ String threadName = "fork-" + args.getForkChannelId() + "-event-thread";
+ eventHandlerBindings = new EventConsumerThread( threadName, stdOut, eventHandler, countdownCloseable, args );
+ eventHandlerBindings.start();
}
@Override
- public CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
- @Nonnull CountdownCloseable countdownCloseable,
- ReadableByteChannel stdOut )
+ public void disable()
{
- return new EventConsumerThread( "fork-" + getArguments().getForkChannelId() + "-event-thread", stdOut,
- eventHandler, countdownCloseable, getArguments() );
+ if ( eventHandlerBindings != null )
+ {
+ eventHandlerBindings.disable();
+ }
+
+ if ( commandReaderBindings != null )
+ {
+ commandReaderBindings.disable();
+ }
}
@Override
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
index 640d562..b7aeddf 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
@@ -21,11 +21,13 @@ package org.apache.maven.plugin.surefire.extensions;
import org.apache.maven.plugin.surefire.booterclient.output.NativeStdOutStreamConsumer;
import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.fork.ForkNodeArguments;
import org.apache.maven.surefire.extensions.CloseableDaemonThread;
import org.apache.maven.surefire.extensions.CommandReader;
+import org.apache.maven.surefire.extensions.Completable;
import org.apache.maven.surefire.extensions.EventHandler;
import org.apache.maven.surefire.extensions.ForkChannel;
-import org.apache.maven.surefire.api.fork.ForkNodeArguments;
+import org.apache.maven.surefire.extensions.util.CountDownLauncher;
import org.apache.maven.surefire.extensions.util.CountdownCloseable;
import org.apache.maven.surefire.extensions.util.LineConsumerThread;
@@ -39,11 +41,13 @@ import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import static java.net.StandardSocketOptions.SO_KEEPALIVE;
import static java.net.StandardSocketOptions.SO_REUSEADDR;
@@ -79,8 +83,13 @@ final class SurefireForkChannel extends ForkChannel
private final String localHost;
private final int localPort;
private final String sessionId;
- private volatile AsynchronousSocketChannel worker;
+ private final AtomicReference<AsynchronousSocketChannel> worker = new AtomicReference<>();
+ private final Bindings bindings = new Bindings( 3 );
private volatile LineConsumerThread out;
+ private volatile CloseableDaemonThread commandReaderBindings;
+ private volatile CloseableDaemonThread eventHandlerBindings;
+ private volatile EventBindings eventBindings;
+ private volatile CommandBindings commandBindings;
SurefireForkChannel( @Nonnull ForkNodeArguments arguments ) throws IOException
{
@@ -96,45 +105,73 @@ final class SurefireForkChannel extends ForkChannel
}
@Override
- public void connectToClient() throws IOException
+ public Completable connectToClient()
{
- if ( worker != null )
+ if ( worker.get() != null )
{
throw new IllegalStateException( "already accepted TCP client connection" );
}
+ AcceptanceHandler result = new AcceptanceHandler();
+ server.accept( null, result );
+ return result;
+ }
- try
- {
- worker = server.accept().get();
- verifySessionId();
- }
- catch ( InterruptedException e )
+ @Override
+ public String getForkNodeConnectionString()
+ {
+ return "tcp://" + localHost + ":" + localPort + "?sessionId=" + sessionId;
+ }
+
+ @Override
+ public int getCountdownCloseablePermits()
+ {
+ return 3;
+ }
+
+ @Override
+ public void bindCommandReader( @Nonnull CommandReader commands, WritableByteChannel stdIn )
+ {
+ commandBindings = new CommandBindings( commands );
+
+ bindings.countDown();
+ }
+
+ @Override
+ public void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
+ @Nonnull CountdownCloseable countdown,
+ ReadableByteChannel stdOut )
+ {
+ ForkNodeArguments args = getArguments();
+ out = new LineConsumerThread( "fork-" + args.getForkChannelId() + "-out-thread", stdOut,
+ new NativeStdOutStreamConsumer( args.getConsoleLogger() ), countdown );
+ out.start();
+
+ eventBindings = new EventBindings( eventHandler, countdown );
+
+ bindings.countDown();
+ }
+
+ @Override
+ public void disable()
+ {
+ if ( eventHandlerBindings != null )
{
- throw new IOException( e.getLocalizedMessage(), e );
+ eventHandlerBindings.disable();
}
- catch ( ExecutionException e )
+
+ if ( commandReaderBindings != null )
{
- throw new IOException( e.getLocalizedMessage(), e.getCause() );
+ commandReaderBindings.disable();
}
}
- private void verifySessionId() throws InterruptedException, ExecutionException, IOException
+ @Override
+ public void close() throws IOException
{
- ByteBuffer buffer = ByteBuffer.allocate( sessionId.length() );
- int read;
- do
- {
- read = worker.read( buffer ).get();
- } while ( read != -1 && buffer.hasRemaining() );
- if ( read == -1 )
- {
- throw new IOException( "Channel closed while verifying the client." );
- }
- ( (Buffer) buffer ).flip();
- String clientSessionId = new String( buffer.array(), US_ASCII );
- if ( !clientSessionId.equals( sessionId ) )
+ //noinspection unused,EmptyTryBlock,EmptyTryBlock
+ try ( Closeable c1 = worker.get(); Closeable c2 = server; Closeable c3 = out )
{
- throw new InvalidSessionIdException( clientSessionId, sessionId );
+ // only close all channels
}
}
@@ -151,51 +188,131 @@ final class SurefireForkChannel extends ForkChannel
}
}
- @Override
- public String getForkNodeConnectionString()
+ private final class AcceptanceHandler
+ implements CompletionHandler<AsynchronousSocketChannel, Void>, Completable
{
- return "tcp://" + localHost + ":" + localPort + "?sessionId=" + sessionId;
- }
+ private final CountDownLatch acceptanceSynchronizer = new CountDownLatch( 1 );
+ private final CountDownLatch authSynchronizer = new CountDownLatch( 1 );
+ private volatile String messageOfIOException;
+ private volatile String messageOfInvalidSessionIdException;
- @Override
- public int getCountdownCloseablePermits()
- {
- return 3;
+ @Override
+ public void completed( AsynchronousSocketChannel channel, Void attachment )
+ {
+ if ( worker.compareAndSet( null, channel ) )
+ {
+ acceptanceSynchronizer.countDown();
+ final ByteBuffer buffer = ByteBuffer.allocate( sessionId.length() );
+ channel.read( buffer, null, new CompletionHandler<Integer, Object>()
+ {
+ @Override
+ public void completed( Integer read, Object attachment )
+ {
+ if ( read == -1 )
+ {
+ messageOfIOException = "Channel closed while verifying the client.";
+ }
+ ( (Buffer) buffer ).flip();
+ String clientSessionId = new String( buffer.array(), US_ASCII );
+ if ( !clientSessionId.equals( sessionId ) )
+ {
+ messageOfInvalidSessionIdException = "The actual sessionId '" + clientSessionId
+ + "' does not match '" + sessionId + "'.";
+ }
+ authSynchronizer.countDown();
+
+ bindings.countDown();
+ }
+
+ @Override
+ public void failed( Throwable exception, Object attachment )
+ {
+ getArguments().dumpStreamException( exception );
+ }
+ } );
+ }
+ else
+ {
+ getArguments().dumpStreamText( "Another TCP client attempts to connect." );
+ }
+ }
+
+ @Override
+ public void failed( Throwable exception, Void attachment )
+ {
+ getArguments().dumpStreamException( exception );
+ acceptanceSynchronizer.countDown();
+ }
+
+ @Override
+ public void complete() throws IOException, InterruptedException
+ {
+ completeAcceptance();
+ authSynchronizer.await();
+ }
+
+ void completeAcceptance() throws InterruptedException
+ {
+ acceptanceSynchronizer.await();
+ }
}
- @Override
- public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
- WritableByteChannel stdIn )
+ private class EventBindings
{
- // dont use newBufferedChannel here - may cause the command is not sent and the JVM hangs
- // only newChannel flushes the message
- // newBufferedChannel does not flush
- WritableByteChannel channel = newChannel( newOutputStream( worker ) );
- return new StreamFeeder( "commands-fork-" + getArguments().getForkChannelId(), channel, commands,
- getArguments().getConsoleLogger() );
+ private final EventHandler<Event> eventHandler;
+ private final CountdownCloseable countdown;
+
+ private EventBindings( EventHandler<Event> eventHandler, CountdownCloseable countdown )
+ {
+ this.eventHandler = eventHandler;
+ this.countdown = countdown;
+ }
+
+ void bindEventHandler( AsynchronousSocketChannel source )
+ {
+ ForkNodeArguments args = getArguments();
+ String threadName = "fork-" + args.getForkChannelId() + "-event-thread";
+ ReadableByteChannel channel = newBufferedChannel( newInputStream( source ) );
+ eventHandlerBindings = new EventConsumerThread( threadName, channel, eventHandler, countdown, args );
+ eventHandlerBindings.start();
+ }
}
- @Override
- public CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
- @Nonnull CountdownCloseable countdownCloseable,
- ReadableByteChannel stdOut )
+ private class CommandBindings
{
- out = new LineConsumerThread( "fork-" + getArguments().getForkChannelId() + "-out-thread", stdOut,
- new NativeStdOutStreamConsumer( getArguments().getConsoleLogger() ), countdownCloseable );
- out.start();
+ private final CommandReader commands;
- ReadableByteChannel channel = newBufferedChannel( newInputStream( worker ) );
- return new EventConsumerThread( "fork-" + getArguments().getForkChannelId() + "-event-thread", channel,
- eventHandler, countdownCloseable, getArguments() );
+ private CommandBindings( CommandReader commands )
+ {
+ this.commands = commands;
+ }
+
+ void bindCommandSender( AsynchronousSocketChannel source )
+ {
+ // dont use newBufferedChannel here - may cause the command is not sent and the JVM hangs
+ // only newChannel flushes the message
+ // newBufferedChannel does not flush
+ ForkNodeArguments args = getArguments();
+ WritableByteChannel channel = newChannel( newOutputStream( source ) );
+ String threadName = "commands-fork-" + args.getForkChannelId();
+ commandReaderBindings = new StreamFeeder( threadName, channel, commands, args.getConsoleLogger() );
+ commandReaderBindings.start();
+ }
}
- @Override
- public void close() throws IOException
+ private class Bindings extends CountDownLauncher
{
- //noinspection unused,EmptyTryBlock,EmptyTryBlock
- try ( Closeable c1 = worker; Closeable c2 = server; Closeable c3 = out )
+ private Bindings( int count )
{
- // only close all channels
+ super( count );
+ }
+
+ @Override
+ protected void job()
+ {
+ AsynchronousSocketChannel channel = worker.get();
+ eventBindings.bindEventHandler( channel );
+ commandBindings.bindCommandSender( channel );
}
}
}
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
index 87bcca7..60928fb 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
@@ -170,8 +170,7 @@ public class E2ETest
ThreadedStreamConsumer queue = new ThreadedStreamConsumer( h );
- server.bindEventHandler( queue, new CountdownCloseable( new DummyCloseable(), 1 ), new DummyReadableChannel() )
- .start();
+ server.bindEventHandler( queue, new CountdownCloseable( new DummyCloseable(), 1 ), new DummyReadableChannel() );
assertThat( awaitHandlerFinished.await( 30L, TimeUnit.SECONDS ) )
.isTrue();
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
index 428ab10..b34ecc0 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
@@ -158,10 +158,10 @@ public class ForkChannelTest
client.start();
channel.connectToClient();
- channel.bindCommandReader( commandReader, null ).start();
+ channel.bindCommandReader( commandReader, null );
ReadableByteChannel stdOut = mock( ReadableByteChannel.class );
when( stdOut.read( any( ByteBuffer.class ) ) ).thenReturn( -1 );
- channel.bindEventHandler( consumer, cc, stdOut ).start();
+ channel.bindEventHandler( consumer, cc, stdOut );
commandReader.noop();
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
index 6232209..f6b3e6a 100644
--- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
@@ -132,7 +132,7 @@ public class SurefireMasterProcessChannelProcessorFactory
{
if ( clientSocketChannel.supportedOptions().contains( option ) )
{
- clientSocketChannel.setOption( option, true );
+ // clientSocketChannel.setOption( option, true );
}
}
}
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
index cc2964c..f0f32d3 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
@@ -20,18 +20,15 @@ package org.apache.maven.surefire.extensions;
*/
import javax.annotation.Nonnull;
-import java.io.Closeable;
/**
* The base thread class used to handle a stream, transforms the stream to an object.
*/
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public abstract class CloseableDaemonThread extends Thread implements Stoppable
{
protected CloseableDaemonThread( @Nonnull String threadName )
{
setName( threadName );
setDaemon( true );
}
-
- public abstract void disable();
}
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Completable.java
similarity index 64%
copy from surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
copy to surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Completable.java
index cc2964c..30506b2 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Completable.java
@@ -19,19 +19,24 @@ package org.apache.maven.surefire.extensions;
* under the License.
*/
-import javax.annotation.Nonnull;
-import java.io.Closeable;
+import java.io.IOException;
/**
- * The base thread class used to handle a stream, transforms the stream to an object.
+ * Forks the execution of task and the task completion.
+ * The method {@link #complete()} waits for the task to complete or fails.
+ *
+ * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
+ * @since 3.0.0-M5
*/
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public interface Completable
{
- protected CloseableDaemonThread( @Nonnull String threadName )
+ Completable EMPTY_COMPLETABLE = new Completable()
{
- setName( threadName );
- setDaemon( true );
- }
+ @Override
+ public void complete()
+ {
+ }
+ };
- public abstract void disable();
+ void complete() throws IOException, InterruptedException;
}
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
index 62e88be..26145fb 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
@@ -53,7 +53,7 @@ public abstract class ForkChannel implements Closeable
this.arguments = arguments;
}
- public abstract void connectToClient() throws IOException;
+ public abstract Completable connectToClient() throws IOException, InterruptedException;
/**
* This is server related class, which if binds to a TCP port, determines the connection string for the client.
@@ -68,28 +68,26 @@ public abstract class ForkChannel implements Closeable
public abstract int getCountdownCloseablePermits();
/**
- * Binds command handler to the channel.
+ * Binds command handler to the channel. Starts a Thread streaming out the commands.
*
* @param commands command reader, see {@link CommandReader#readNextCommand()}
* @param stdIn optional standard input stream of the JVM to write the encoded commands into it
- * @return the thread instance to start up in order to stream out the data
* @throws IOException if an error in the fork channel
*/
- public abstract CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
- WritableByteChannel stdIn )
+ public abstract void bindCommandReader( @Nonnull CommandReader commands, WritableByteChannel stdIn )
throws IOException;
/**
+ * Starts a Thread reading the events.
*
* @param eventHandler event eventHandler
* @param countdownCloseable count down of the final call of {@link Closeable#close()}
* @param stdOut optional standard output stream of the JVM
- * @return the thread instance to start up in order to stream out the data
* @throws IOException if an error in the fork channel
*/
- public abstract CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
- @Nonnull CountdownCloseable countdownCloseable,
- ReadableByteChannel stdOut )
+ public abstract void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
+ @Nonnull CountdownCloseable countdownCloseable,
+ ReadableByteChannel stdOut )
throws IOException;
@Nonnull
@@ -97,4 +95,6 @@ public abstract class ForkChannel implements Closeable
{
return arguments;
}
+
+ public abstract void disable();
}
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Stoppable.java
similarity index 70%
copy from surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
copy to surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Stoppable.java
index cc2964c..e637b3e 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Stoppable.java
@@ -19,19 +19,12 @@ package org.apache.maven.surefire.extensions;
* under the License.
*/
-import javax.annotation.Nonnull;
import java.io.Closeable;
/**
- * The base thread class used to handle a stream, transforms the stream to an object.
+ * Used in {@link CloseableDaemonThread}.
*/
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public interface Stoppable extends Closeable
{
- protected CloseableDaemonThread( @Nonnull String threadName )
- {
- setName( threadName );
- setDaemon( true );
- }
-
- public abstract void disable();
+ void disable();
}
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
similarity index 51%
copy from surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
copy to surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
index cc2964c..ec35682 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
@@ -1,4 +1,4 @@
-package org.apache.maven.surefire.extensions;
+package org.apache.maven.surefire.extensions.util;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,19 +19,35 @@ package org.apache.maven.surefire.extensions;
* under the License.
*/
-import javax.annotation.Nonnull;
-import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * The base thread class used to handle a stream, transforms the stream to an object.
+ * Counts down the calls {@link #countDown()} and the last reaching zero executes the {@link #job()}.
+ *
+ * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
+ * @since 3.0.0-M5
*/
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public abstract class CountDownLauncher
{
- protected CloseableDaemonThread( @Nonnull String threadName )
+ private final AtomicInteger countDown;
+
+ public CountDownLauncher( int count )
{
- setName( threadName );
- setDaemon( true );
+ if ( count <= 0 )
+ {
+ throw new IllegalStateException( "count=" + count + " should be greater than zero" );
+ }
+
+ countDown = new AtomicInteger( count );
}
- public abstract void disable();
+ protected abstract void job();
+
+ public void countDown()
+ {
+ if ( countDown.decrementAndGet() == 0 )
+ {
+ job();
+ }
+ }
}