You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2021/04/05 11:54:05 UTC

[maven-surefire] 01/01: dry pipes and guarantee that tcp server is on acceptance before JAR execution

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch dry-pipes-tcp-guarantees
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit 64bb1471c02d7d387fe2c7f64a52e77d45dd67df
Author: Tibor Digana <ti...@gmail.com>
AuthorDate: Mon Apr 5 13:53:44 2021 +0200

    dry pipes and guarantee that tcp server is on acceptance before JAR execution
---
 .../plugin/surefire/booterclient/ForkStarter.java  |  30 ++-
 .../surefire/extensions/LegacyForkChannel.java     |  42 +++-
 .../surefire/extensions/SurefireForkChannel.java   | 239 +++++++++++++++------
 .../maven/plugin/surefire/extensions/E2ETest.java  |   3 +-
 .../maven/surefire/extensions/ForkChannelTest.java |   4 +-
 ...refireMasterProcessChannelProcessorFactory.java |   2 +-
 .../surefire/extensions/CloseableDaemonThread.java |   5 +-
 ...CloseableDaemonThread.java => Completable.java} |  23 +-
 .../maven/surefire/extensions/ForkChannel.java     |  18 +-
 .../{CloseableDaemonThread.java => Stoppable.java} |  13 +-
 .../CountDownLauncher.java}                        |  34 ++-
 11 files changed, 280 insertions(+), 133 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
index 200d2a8..2cad2a3 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
@@ -41,11 +41,12 @@ import org.apache.maven.surefire.api.booter.Shutdown;
 import org.apache.maven.surefire.booter.StartupConfiguration;
 import org.apache.maven.surefire.booter.SurefireBooterForkException;
 import org.apache.maven.surefire.booter.SurefireExecutionException;
-import org.apache.maven.surefire.extensions.CloseableDaemonThread;
+import org.apache.maven.surefire.extensions.Completable;
 import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.extensions.ForkChannel;
 import org.apache.maven.surefire.extensions.ForkNodeFactory;
 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
+import org.apache.maven.surefire.extensions.Stoppable;
 import org.apache.maven.surefire.extensions.util.CommandlineExecutor;
 import org.apache.maven.surefire.extensions.util.CommandlineStreams;
 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
@@ -628,31 +629,29 @@ public class ForkStarter
         Integer result = null;
         RunResult runResult = null;
         SurefireBooterForkException booterForkException = null;
-        CloseableDaemonThread in = null;
-        CloseableDaemonThread out = null;
-        CloseableDaemonThread err = null;
+        Stoppable err = null;
         DefaultReporterFactory reporter = forkClient.getDefaultReporterFactory();
         currentForkClients.add( forkClient );
         CountdownCloseable countdownCloseable =
             new CountdownCloseable( eventConsumer, forkChannel.getCountdownCloseablePermits() );
         try ( CommandlineExecutor exec = new CommandlineExecutor( cli, countdownCloseable ) )
         {
+            Completable client = forkChannel.connectToClient();
             CommandlineStreams streams = exec.execute();
             closer.addCloseable( streams );
 
-            forkChannel.connectToClient();
-            log.debug( "Fork Channel [" + forkNumber + "] connected to the client." );
-
-            in = forkChannel.bindCommandReader( commandReader, streams.getStdInChannel() );
-            in.start();
+            forkChannel.bindCommandReader( commandReader, streams.getStdInChannel() );
 
-            out = forkChannel.bindEventHandler( eventConsumer, countdownCloseable, streams.getStdOutChannel() );
-            out.start();
+            forkChannel.bindEventHandler( eventConsumer, countdownCloseable, streams.getStdOutChannel() );
 
             EventHandler<String> errConsumer = new NativeStdErrStreamConsumer( log );
-            err = new LineConsumerThread( "fork-" + forkNumber + "-err-thread", streams.getStdErrChannel(),
-                errConsumer, countdownCloseable );
-            err.start();
+            LineConsumerThread stdErr = new LineConsumerThread( "fork-" + forkNumber + "-err-thread",
+                streams.getStdErrChannel(), errConsumer, countdownCloseable );
+            err = stdErr;
+            stdErr.start();
+
+            client.complete();
+            log.debug( "Fork Channel [" + forkNumber + "] connected to the client." );
 
             result = exec.awaitExit();
 
@@ -670,8 +669,7 @@ public class ForkStarter
         {
             log.error( "Closing the streams after (InterruptedException) '" + e.getLocalizedMessage() + "'" );
             // maybe implement it in the Future.cancel() of the extension or similar
-            in.disable();
-            out.disable();
+            forkChannel.disable();
             err.disable();
         }
         catch ( Exception e )
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
index e18f77d..226adaa 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
@@ -22,6 +22,7 @@ package org.apache.maven.plugin.surefire.extensions;
 import org.apache.maven.surefire.api.event.Event;
 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
 import org.apache.maven.surefire.extensions.CommandReader;
+import org.apache.maven.surefire.extensions.Completable;
 import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.extensions.ForkChannel;
 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
@@ -41,14 +42,18 @@ import java.nio.channels.WritableByteChannel;
  */
 final class LegacyForkChannel extends ForkChannel
 {
+    private CloseableDaemonThread commandReaderBindings;
+    private CloseableDaemonThread eventHandlerBindings;
+
     LegacyForkChannel( @Nonnull ForkNodeArguments arguments )
     {
         super( arguments );
     }
 
     @Override
-    public void connectToClient()
+    public Completable connectToClient()
     {
+        return Completable.EMPTY_COMPLETABLE;
     }
 
     @Override
@@ -64,20 +69,37 @@ final class LegacyForkChannel extends ForkChannel
     }
 
     @Override
-    public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
-                                                    WritableByteChannel stdIn )
+    public void bindCommandReader( @Nonnull CommandReader commands, WritableByteChannel stdIn )
+    {
+        ForkNodeArguments args = getArguments();
+        String threadName = "commands-fork-" + args.getForkChannelId();
+        commandReaderBindings = new StreamFeeder( threadName, stdIn, commands, args.getConsoleLogger() );
+        commandReaderBindings.start();
+    }
+
+    @Override
+    public void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
+                                  @Nonnull CountdownCloseable countdownCloseable,
+                                  ReadableByteChannel stdOut )
     {
-        return new StreamFeeder( "std-in-fork-" + getArguments().getForkChannelId(), stdIn, commands,
-            getArguments().getConsoleLogger() );
+        ForkNodeArguments args = getArguments();
+        String threadName = "fork-" + args.getForkChannelId() + "-event-thread";
+        eventHandlerBindings = new EventConsumerThread( threadName, stdOut, eventHandler, countdownCloseable, args );
+        eventHandlerBindings.start();
     }
 
     @Override
-    public CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
-                                                   @Nonnull CountdownCloseable countdownCloseable,
-                                                   ReadableByteChannel stdOut )
+    public void disable()
     {
-        return new EventConsumerThread( "fork-" + getArguments().getForkChannelId() + "-event-thread", stdOut,
-            eventHandler, countdownCloseable, getArguments() );
+        if ( eventHandlerBindings != null )
+        {
+            eventHandlerBindings.disable();
+        }
+
+        if ( commandReaderBindings != null )
+        {
+            commandReaderBindings.disable();
+        }
     }
 
     @Override
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
index 640d562..b7aeddf 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
@@ -21,11 +21,13 @@ package org.apache.maven.plugin.surefire.extensions;
 
 import org.apache.maven.plugin.surefire.booterclient.output.NativeStdOutStreamConsumer;
 import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.fork.ForkNodeArguments;
 import org.apache.maven.surefire.extensions.CloseableDaemonThread;
 import org.apache.maven.surefire.extensions.CommandReader;
+import org.apache.maven.surefire.extensions.Completable;
 import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.extensions.ForkChannel;
-import org.apache.maven.surefire.api.fork.ForkNodeArguments;
+import org.apache.maven.surefire.extensions.util.CountDownLauncher;
 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
 import org.apache.maven.surefire.extensions.util.LineConsumerThread;
 
@@ -39,11 +41,13 @@ import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousServerSocketChannel;
 import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.net.StandardSocketOptions.SO_KEEPALIVE;
 import static java.net.StandardSocketOptions.SO_REUSEADDR;
@@ -79,8 +83,13 @@ final class SurefireForkChannel extends ForkChannel
     private final String localHost;
     private final int localPort;
     private final String sessionId;
-    private volatile AsynchronousSocketChannel worker;
+    private final AtomicReference<AsynchronousSocketChannel> worker = new AtomicReference<>();
+    private final Bindings bindings = new Bindings( 3 );
     private volatile LineConsumerThread out;
+    private volatile CloseableDaemonThread commandReaderBindings;
+    private volatile CloseableDaemonThread eventHandlerBindings;
+    private volatile EventBindings eventBindings;
+    private volatile CommandBindings commandBindings;
 
     SurefireForkChannel( @Nonnull ForkNodeArguments arguments ) throws IOException
     {
@@ -96,45 +105,73 @@ final class SurefireForkChannel extends ForkChannel
     }
 
     @Override
-    public void connectToClient() throws IOException
+    public Completable connectToClient()
     {
-        if ( worker != null )
+        if ( worker.get() != null )
         {
             throw new IllegalStateException( "already accepted TCP client connection" );
         }
+        AcceptanceHandler result = new AcceptanceHandler();
+        server.accept( null, result );
+        return result;
+    }
 
-        try
-        {
-            worker = server.accept().get();
-            verifySessionId();
-        }
-        catch ( InterruptedException e )
+    @Override
+    public String getForkNodeConnectionString()
+    {
+        return "tcp://" + localHost + ":" + localPort + "?sessionId=" + sessionId;
+    }
+
+    @Override
+    public int getCountdownCloseablePermits()
+    {
+        return 3;
+    }
+
+    @Override
+    public void bindCommandReader( @Nonnull CommandReader commands, WritableByteChannel stdIn )
+    {
+        commandBindings = new CommandBindings( commands );
+
+        bindings.countDown();
+    }
+
+    @Override
+    public void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
+                                  @Nonnull CountdownCloseable countdown,
+                                  ReadableByteChannel stdOut )
+    {
+        ForkNodeArguments args = getArguments();
+        out = new LineConsumerThread( "fork-" + args.getForkChannelId() + "-out-thread", stdOut,
+            new NativeStdOutStreamConsumer( args.getConsoleLogger() ), countdown );
+        out.start();
+
+        eventBindings = new EventBindings( eventHandler, countdown );
+
+        bindings.countDown();
+    }
+
+    @Override
+    public void disable()
+    {
+        if ( eventHandlerBindings != null )
         {
-            throw new IOException( e.getLocalizedMessage(), e );
+            eventHandlerBindings.disable();
         }
-        catch ( ExecutionException e )
+
+        if ( commandReaderBindings != null )
         {
-            throw new IOException( e.getLocalizedMessage(), e.getCause() );
+            commandReaderBindings.disable();
         }
     }
 
-    private void verifySessionId() throws InterruptedException, ExecutionException, IOException
+    @Override
+    public void close() throws IOException
     {
-        ByteBuffer buffer = ByteBuffer.allocate( sessionId.length() );
-        int read;
-        do
-        {
-            read = worker.read( buffer ).get();
-        } while ( read != -1 && buffer.hasRemaining() );
-        if ( read == -1 )
-        {
-            throw new IOException( "Channel closed while verifying the client." );
-        }
-        ( (Buffer) buffer ).flip();
-        String clientSessionId = new String( buffer.array(), US_ASCII );
-        if ( !clientSessionId.equals( sessionId ) )
+        //noinspection unused,EmptyTryBlock,EmptyTryBlock
+        try ( Closeable c1 = worker.get(); Closeable c2 = server; Closeable c3 = out )
         {
-            throw new InvalidSessionIdException( clientSessionId, sessionId );
+            // only close all channels
         }
     }
 
@@ -151,51 +188,131 @@ final class SurefireForkChannel extends ForkChannel
         }
     }
 
-    @Override
-    public String getForkNodeConnectionString()
+    private final class AcceptanceHandler
+        implements CompletionHandler<AsynchronousSocketChannel, Void>, Completable
     {
-        return "tcp://" + localHost + ":" + localPort + "?sessionId=" + sessionId;
-    }
+        private final CountDownLatch acceptanceSynchronizer = new CountDownLatch( 1 );
+        private final CountDownLatch authSynchronizer = new CountDownLatch( 1 );
+        private volatile String messageOfIOException;
+        private volatile String messageOfInvalidSessionIdException;
 
-    @Override
-    public int getCountdownCloseablePermits()
-    {
-        return 3;
+        @Override
+        public void completed( AsynchronousSocketChannel channel, Void attachment )
+        {
+            if ( worker.compareAndSet( null, channel ) )
+            {
+                acceptanceSynchronizer.countDown();
+                final ByteBuffer buffer = ByteBuffer.allocate( sessionId.length() );
+                channel.read( buffer, null, new CompletionHandler<Integer, Object>()
+                {
+                    @Override
+                    public void completed( Integer read, Object attachment )
+                    {
+                        if ( read == -1 )
+                        {
+                            messageOfIOException = "Channel closed while verifying the client.";
+                        }
+                        ( (Buffer) buffer ).flip();
+                        String clientSessionId = new String( buffer.array(), US_ASCII );
+                        if ( !clientSessionId.equals( sessionId ) )
+                        {
+                            messageOfInvalidSessionIdException = "The actual sessionId '" + clientSessionId
+                                + "' does not match '" + sessionId + "'.";
+                        }
+                        authSynchronizer.countDown();
+
+                        bindings.countDown();
+                    }
+
+                    @Override
+                    public void failed( Throwable exception, Object attachment )
+                    {
+                        getArguments().dumpStreamException( exception );
+                    }
+                } );
+            }
+            else
+            {
+                getArguments().dumpStreamText( "Another TCP client attempts to connect." );
+            }
+        }
+
+        @Override
+        public void failed( Throwable exception, Void attachment )
+        {
+            getArguments().dumpStreamException( exception );
+            acceptanceSynchronizer.countDown();
+        }
+
+        @Override
+        public void complete() throws IOException, InterruptedException
+        {
+            completeAcceptance();
+            authSynchronizer.await();
+        }
+
+        void completeAcceptance() throws InterruptedException
+        {
+            acceptanceSynchronizer.await();
+        }
     }
 
-    @Override
-    public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
-                                                    WritableByteChannel stdIn )
+    private class EventBindings
     {
-        // dont use newBufferedChannel here - may cause the command is not sent and the JVM hangs
-        // only newChannel flushes the message
-        // newBufferedChannel does not flush
-        WritableByteChannel channel = newChannel( newOutputStream( worker ) );
-        return new StreamFeeder( "commands-fork-" + getArguments().getForkChannelId(), channel, commands,
-            getArguments().getConsoleLogger() );
+        private final EventHandler<Event> eventHandler;
+        private final CountdownCloseable countdown;
+
+        private EventBindings( EventHandler<Event> eventHandler, CountdownCloseable countdown )
+        {
+            this.eventHandler = eventHandler;
+            this.countdown = countdown;
+        }
+
+        void bindEventHandler( AsynchronousSocketChannel source )
+        {
+            ForkNodeArguments args = getArguments();
+            String threadName = "fork-" + args.getForkChannelId() + "-event-thread";
+            ReadableByteChannel channel = newBufferedChannel( newInputStream( source ) );
+            eventHandlerBindings = new EventConsumerThread( threadName, channel, eventHandler, countdown, args );
+            eventHandlerBindings.start();
+        }
     }
 
-    @Override
-    public CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
-                                                   @Nonnull CountdownCloseable countdownCloseable,
-                                                   ReadableByteChannel stdOut )
+    private class CommandBindings
     {
-        out = new LineConsumerThread( "fork-" + getArguments().getForkChannelId() + "-out-thread", stdOut,
-            new NativeStdOutStreamConsumer( getArguments().getConsoleLogger() ), countdownCloseable );
-        out.start();
+        private final CommandReader commands;
 
-        ReadableByteChannel channel = newBufferedChannel( newInputStream( worker ) );
-        return new EventConsumerThread( "fork-" + getArguments().getForkChannelId() + "-event-thread", channel,
-            eventHandler, countdownCloseable, getArguments() );
+        private CommandBindings( CommandReader commands )
+        {
+            this.commands = commands;
+        }
+
+        void bindCommandSender( AsynchronousSocketChannel source )
+        {
+            // dont use newBufferedChannel here - may cause the command is not sent and the JVM hangs
+            // only newChannel flushes the message
+            // newBufferedChannel does not flush
+            ForkNodeArguments args = getArguments();
+            WritableByteChannel channel = newChannel( newOutputStream( source ) );
+            String threadName = "commands-fork-" + args.getForkChannelId();
+            commandReaderBindings = new StreamFeeder( threadName, channel, commands, args.getConsoleLogger() );
+            commandReaderBindings.start();
+        }
     }
 
-    @Override
-    public void close() throws IOException
+    private class Bindings extends CountDownLauncher
     {
-        //noinspection unused,EmptyTryBlock,EmptyTryBlock
-        try ( Closeable c1 = worker; Closeable c2 = server; Closeable c3 = out )
+        private Bindings( int count )
         {
-            // only close all channels
+            super( count );
+        }
+
+        @Override
+        protected void job()
+        {
+            AsynchronousSocketChannel channel = worker.get();
+            eventBindings.bindEventHandler( channel );
+            commandBindings.bindCommandSender( channel );
         }
     }
 }
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
index 87bcca7..60928fb 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/extensions/E2ETest.java
@@ -170,8 +170,7 @@ public class E2ETest
 
         ThreadedStreamConsumer queue = new ThreadedStreamConsumer( h );
 
-        server.bindEventHandler( queue, new CountdownCloseable( new DummyCloseable(), 1 ), new DummyReadableChannel() )
-            .start();
+        server.bindEventHandler( queue, new CountdownCloseable( new DummyCloseable(), 1 ), new DummyReadableChannel() );
 
         assertThat( awaitHandlerFinished.await( 30L, TimeUnit.SECONDS ) )
             .isTrue();
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
index 428ab10..b34ecc0 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
@@ -158,10 +158,10 @@ public class ForkChannelTest
             client.start();
 
             channel.connectToClient();
-            channel.bindCommandReader( commandReader, null ).start();
+            channel.bindCommandReader( commandReader, null );
             ReadableByteChannel stdOut = mock( ReadableByteChannel.class );
             when( stdOut.read( any( ByteBuffer.class ) ) ).thenReturn( -1 );
-            channel.bindEventHandler( consumer, cc, stdOut ).start();
+            channel.bindEventHandler( consumer, cc, stdOut );
 
             commandReader.noop();
 
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
index 6232209..f6b3e6a 100644
--- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
@@ -132,7 +132,7 @@ public class SurefireMasterProcessChannelProcessorFactory
         {
             if ( clientSocketChannel.supportedOptions().contains( option ) )
             {
-                clientSocketChannel.setOption( option, true );
+                // clientSocketChannel.setOption( option, true );
             }
         }
     }
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
index cc2964c..f0f32d3 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
@@ -20,18 +20,15 @@ package org.apache.maven.surefire.extensions;
  */
 
 import javax.annotation.Nonnull;
-import java.io.Closeable;
 
 /**
  * The base thread class used to handle a stream, transforms the stream to an object.
  */
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public abstract class CloseableDaemonThread extends Thread implements Stoppable
 {
     protected CloseableDaemonThread( @Nonnull String threadName )
     {
         setName( threadName );
         setDaemon( true );
     }
-
-    public abstract void disable();
 }
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Completable.java
similarity index 64%
copy from surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
copy to surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Completable.java
index cc2964c..30506b2 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Completable.java
@@ -19,19 +19,24 @@ package org.apache.maven.surefire.extensions;
  * under the License.
  */
 
-import javax.annotation.Nonnull;
-import java.io.Closeable;
+import java.io.IOException;
 
 /**
- * The base thread class used to handle a stream, transforms the stream to an object.
+ * Forks the execution of task and the task completion.
+ * The method {@link #complete()} waits for the task to complete or fails.
+ *
+ * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
+ * @since 3.0.0-M5
  */
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public interface Completable
 {
-    protected CloseableDaemonThread( @Nonnull String threadName )
+    Completable EMPTY_COMPLETABLE = new Completable()
     {
-        setName( threadName );
-        setDaemon( true );
-    }
+        @Override
+        public void complete()
+        {
+        }
+    };
 
-    public abstract void disable();
+    void complete() throws IOException, InterruptedException;
 }
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
index 62e88be..26145fb 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
@@ -53,7 +53,7 @@ public abstract class ForkChannel implements Closeable
         this.arguments = arguments;
     }
 
-    public abstract void connectToClient() throws IOException;
+    public abstract Completable connectToClient() throws IOException, InterruptedException;
 
     /**
      * This is server related class, which if binds to a TCP port, determines the connection string for the client.
@@ -68,28 +68,26 @@ public abstract class ForkChannel implements Closeable
     public abstract int getCountdownCloseablePermits();
 
     /**
-     * Binds command handler to the channel.
+     * Binds command handler to the channel. Starts a Thread streaming out the commands.
      *
      * @param commands command reader, see {@link CommandReader#readNextCommand()}
      * @param stdIn    optional standard input stream of the JVM to write the encoded commands into it
-     * @return the thread instance to start up in order to stream out the data
      * @throws IOException if an error in the fork channel
      */
-    public abstract CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
-                                                             WritableByteChannel stdIn )
+    public abstract void bindCommandReader( @Nonnull CommandReader commands, WritableByteChannel stdIn )
         throws IOException;
 
     /**
+     * Starts a Thread reading the events.
      *
      * @param eventHandler       event eventHandler
      * @param countdownCloseable count down of the final call of {@link Closeable#close()}
      * @param stdOut             optional standard output stream of the JVM
-     * @return the thread instance to start up in order to stream out the data
      * @throws IOException if an error in the fork channel
      */
-    public abstract CloseableDaemonThread bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
-                                                            @Nonnull CountdownCloseable countdownCloseable,
-                                                            ReadableByteChannel stdOut )
+    public abstract void bindEventHandler( @Nonnull EventHandler<Event> eventHandler,
+                                           @Nonnull CountdownCloseable countdownCloseable,
+                                           ReadableByteChannel stdOut )
         throws IOException;
 
     @Nonnull
@@ -97,4 +95,6 @@ public abstract class ForkChannel implements Closeable
     {
         return arguments;
     }
+
+    public abstract void disable();
 }
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Stoppable.java
similarity index 70%
copy from surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
copy to surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Stoppable.java
index cc2964c..e637b3e 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/Stoppable.java
@@ -19,19 +19,12 @@ package org.apache.maven.surefire.extensions;
  * under the License.
  */
 
-import javax.annotation.Nonnull;
 import java.io.Closeable;
 
 /**
- * The base thread class used to handle a stream, transforms the stream to an object.
+ * Used in {@link CloseableDaemonThread}.
  */
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public interface Stoppable extends Closeable
 {
-    protected CloseableDaemonThread( @Nonnull String threadName )
-    {
-        setName( threadName );
-        setDaemon( true );
-    }
-
-    public abstract void disable();
+    void disable();
 }
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
similarity index 51%
copy from surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
copy to surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
index cc2964c..ec35682 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CloseableDaemonThread.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountDownLauncher.java
@@ -1,4 +1,4 @@
-package org.apache.maven.surefire.extensions;
+package org.apache.maven.surefire.extensions.util;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,19 +19,35 @@ package org.apache.maven.surefire.extensions;
  * under the License.
  */
 
-import javax.annotation.Nonnull;
-import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * The base thread class used to handle a stream, transforms the stream to an object.
+ * Counts down the calls {@link #countDown()} and the last reaching zero executes the {@link #job()}.
+ *
+ * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
+ * @since 3.0.0-M5
  */
-public abstract class CloseableDaemonThread extends Thread implements Closeable
+public abstract class CountDownLauncher
 {
-    protected CloseableDaemonThread( @Nonnull String threadName )
+    private final AtomicInteger countDown;
+
+    public CountDownLauncher( int count )
     {
-        setName( threadName );
-        setDaemon( true );
+        if ( count <= 0 )
+        {
+            throw new IllegalStateException( "count=" + count + " should be greater than zero" );
+        }
+
+        countDown = new AtomicInteger( count );
     }
 
-    public abstract void disable();
+    protected abstract void job();
+
+    public void countDown()
+    {
+        if ( countDown.decrementAndGet() == 0 )
+        {
+            job();
+        }
+    }
 }