You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2020/02/08 12:34:09 UTC

[maven-surefire] branch maven2surefire-jvm-communication updated: finished implementation of extension in ForkStarter, unit test and removed unused code (missing JavaDoc)

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch maven2surefire-jvm-communication
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git


The following commit(s) were added to refs/heads/maven2surefire-jvm-communication by this push:
     new be1981e  finished implementation of extension in ForkStarter, unit test and removed unused code (missing JavaDoc)
be1981e is described below

commit be1981ecfb069eea7dddeeaae4d47504d1d0d598
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Feb 8 13:33:59 2020 +0100

    finished implementation of extension in ForkStarter, unit test and removed unused code (missing JavaDoc)
---
 .../plugin/surefire/booterclient/ForkStarter.java  |  10 +-
 .../surefire/extensions/LegacyForkChannel.java     |  27 +--
 .../extensions/NetworkingProcessExecutor.java      | 216 ---------------------
 .../plugin/surefire/extensions/StdOutAdapter.java  |  42 ----
 .../surefire/extensions/SurefireForkChannel.java   |  67 +++----
 .../org/apache/maven/surefire/JUnit4SuiteTest.java |   2 +
 .../maven/surefire/extensions/ForkChannelTest.java | 132 +++++++++++++
 .../maven/surefire/extensions/ForkChannel.java     |  33 +---
 .../extensions/util/CountdownCloseable.java        |   7 +-
 9 files changed, 181 insertions(+), 355 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
index 8c50bf7..71c0f25 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
@@ -47,6 +47,7 @@ import org.apache.maven.surefire.booter.SurefireBooterForkException;
 import org.apache.maven.surefire.booter.SurefireExecutionException;
 import org.apache.maven.surefire.extensions.ForkChannel;
 import org.apache.maven.surefire.extensions.ForkNodeFactory;
+import org.apache.maven.surefire.extensions.util.LineConsumerThread;
 import org.apache.maven.surefire.providerapi.SurefireProvider;
 import org.apache.maven.surefire.report.StackTraceWriter;
 import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
@@ -630,6 +631,8 @@ public class ForkStarter
             CommandlineStreams streams = exec.execute();
             closer.addCloseable( streams );
 
+            forkChannel.openChannel();
+
             in = forkChannel.useStdIn()
                 ? forkChannel.bindCommandReader( commandReader, streams.getStdInChannel() )
                 : forkChannel.bindCommandReader( commandReader );
@@ -639,12 +642,11 @@ public class ForkStarter
 
             out = forkChannel.useStdOut()
                 ? forkChannel.bindEventHandler( eventConsumer, streams.getStdOutChannel(), countdownCloseable )
-                : forkChannel.bindStdOutConsumer( stdErrConsumer );
+                : forkChannel.bindEventHandler( stdErrConsumer );
             out.start();
 
-            err = forkChannel.useStdErr()
-                ? forkChannel.bindStdErrConsumer( stdErrConsumer, streams.getStdErrChannel(), countdownCloseable )
-                : forkChannel.bindStdErrConsumer( stdErrConsumer );
+            err = new LineConsumerThread( "std-err-fork-" + forkNumber, streams.getStdErrChannel(),
+                stdErrConsumer, countdownCloseable );
             err.start();
 
             result = exec.awaitExit();
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
index c462142..31ad325 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/LegacyForkChannel.java
@@ -42,25 +42,24 @@ final class LegacyForkChannel extends ForkChannel
     }
 
     @Override
-    public String getForkNodeConnectionString()
+    public void openChannel()
     {
-        return "pipe://" + getForkChannelId();
     }
 
     @Override
-    public boolean useStdIn()
+    public String getForkNodeConnectionString()
     {
-        return true;
+        return "pipe://" + getForkChannelId();
     }
 
     @Override
-    public boolean useStdOut()
+    public boolean useStdIn()
     {
         return true;
     }
 
     @Override
-    public boolean useStdErr()
+    public boolean useStdOut()
     {
         return true;
     }
@@ -87,21 +86,7 @@ final class LegacyForkChannel extends ForkChannel
     }
 
     @Override
-    public CloseableDaemonThread bindStdOutConsumer( @Nonnull StreamConsumer consumer )
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public CloseableDaemonThread bindStdErrConsumer(
-        @Nonnull StreamConsumer consumer,
-        @Nonnull ReadableByteChannel stdErr, @Nonnull CountdownCloseable countdownCloseable )
-    {
-        return new LineConsumerThread( "std-err-fork-" + getForkChannelId(), stdErr, consumer, countdownCloseable );
-    }
-
-    @Override
-    public CloseableDaemonThread bindStdErrConsumer( @Nonnull StreamConsumer consumer )
+    public CloseableDaemonThread bindEventHandler( @Nonnull StreamConsumer consumer )
     {
         throw new UnsupportedOperationException();
     }
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/NetworkingProcessExecutor.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/NetworkingProcessExecutor.java
deleted file mode 100644
index 1164ba8..0000000
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/NetworkingProcessExecutor.java
+++ /dev/null
@@ -1,216 +0,0 @@
-package org.apache.maven.plugin.surefire.extensions;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.maven.surefire.booter.Command;
-import org.apache.maven.surefire.booter.MasterProcessCommand;
-import org.apache.maven.surefire.extensions.CommandReader;
-import org.apache.maven.surefire.extensions.EventHandler;
-import org.apache.maven.surefire.extensions.StdErrStreamLine;
-import org.apache.maven.surefire.extensions.StdOutStreamLine;
-import org.apache.maven.surefire.shared.utils.cli.CommandLineUtils;
-import org.apache.maven.surefire.shared.utils.cli.Commandline;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousServerSocketChannel;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.CompletionHandler;
-import java.util.Scanner;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import static java.nio.ByteBuffer.wrap;
-import static java.nio.charset.StandardCharsets.US_ASCII;
-
-/**
- * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
- * @since 3.0.0-M4
- */
-final class NetworkingProcessExecutor
-{
-    private final AsynchronousServerSocketChannel server;
-    private final ExecutorService executorService;
-
-    NetworkingProcessExecutor( AsynchronousServerSocketChannel server, ExecutorService executorService )
-    {
-        this.server = server;
-        this.executorService = executorService;
-    }
-
-    @Nonnull
-    public Callable<Integer> executeCommandLineAsCallable( @Nonnull Commandline cli,
-                                                           @Nonnull final CommandReader commands,
-                                                           @Nonnull final EventHandler events,
-                                                           StdOutStreamLine stdOut,
-                                                           StdErrStreamLine stdErr,
-                                                           @Nonnull Runnable runAfterProcessTermination )
-            throws Exception
-    {
-        server.accept( null, new CompletionHandler<AsynchronousSocketChannel, Object>()
-        {
-            @Override
-            public void completed( final AsynchronousSocketChannel client, Object attachment )
-            {
-                executorService.submit( new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        InputStream is = toInputStream( client );
-                        try
-                        {
-                            for ( Scanner scanner = new Scanner( is, "ASCII" ); scanner.hasNextLine(); )
-                            {
-                                if ( scanner.ioException() != null )
-                                {
-                                    break;
-                                }
-                                events.handleEvent( scanner.nextLine() );
-                            }
-                        }
-                        catch ( IllegalStateException e )
-                        {
-                            // scanner and InputStream is closed
-                            try
-                            {
-                                client.close();
-                            }
-                            catch ( IOException ex )
-                            {
-                                // couldn't close the client channel
-                            }
-                        }
-                    }
-                } );
-
-                executorService.submit( new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        try
-                        {
-                            for ( Command cmd; !commands.isClosed();  )
-                            {
-                                cmd = commands.readNextCommand();
-                                if ( cmd == null )
-                                {
-                                    break;
-                                }
-                                MasterProcessCommand cmdType = cmd.getCommandType();
-                                byte[] b = cmdType.hasDataType() ? cmdType.encode( cmd.getData() ) : cmdType.encode();
-                                ByteBuffer bb = wrap( b );
-                                do
-                                {
-                                    client.write( bb ).get();
-                                }
-                                while ( bb.hasRemaining() );
-                            }
-                        }
-                        catch ( Exception e )
-                        {
-                            // finished stream or error
-                            try
-                            {
-                                client.close();
-                            }
-                            catch ( IOException ex )
-                            {
-                                // couldn't close the client channel
-                            }
-                        }
-                    }
-                } );
-            }
-
-            @Override
-            public void failed( Throwable exc, Object attachment )
-            {
-                // write to dump file
-                // close the server
-            }
-        } );
-
-        return CommandLineUtils.executeCommandLineAsCallable( cli, null,
-                new StdOutAdapter( stdOut ), stdErr, 0, runAfterProcessTermination, US_ASCII );
-    }
-
-    private static InputStream toInputStream( final AsynchronousSocketChannel client )
-    {
-        return new InputStream()
-        {
-            private final ByteBuffer bb = ByteBuffer.allocate( 64 * 1024 );
-            private boolean closed;
-
-            @Override
-            public int read() throws IOException
-            {
-                if ( closed )
-                {
-                    return -1;
-                }
-
-                try
-                {
-                    if ( !bb.hasRemaining() )
-                    {
-                        bb.clear();
-                        if ( client.read( bb ).get() == 0 )
-                        {
-                            closed = true;
-                            return -1;
-                        }
-                        bb.flip();
-                    }
-                    return bb.get();
-                }
-                catch ( InterruptedException e )
-                {
-                    closed = true;
-                    return -1;
-                }
-                catch ( ExecutionException e )
-                {
-                    closed = true;
-                    Throwable cause = e.getCause();
-                    if ( cause instanceof IOException )
-                    {
-                        throw (IOException) cause;
-                    }
-                    else
-                    {
-                        return -1;
-                    }
-                }
-            }
-
-            @Override
-            public void close() throws IOException
-            {
-                closed = true;
-                super.close();
-            }
-        };
-    }
-}
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/StdOutAdapter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/StdOutAdapter.java
deleted file mode 100644
index 4ca8cf5..0000000
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/StdOutAdapter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.maven.plugin.surefire.extensions;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.maven.surefire.extensions.StdOutStreamLine;
-import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
-
-/**
- *
- */
-final class StdOutAdapter implements StreamConsumer
-{
-    private final StdOutStreamLine stdOut;
-
-    StdOutAdapter( StdOutStreamLine stdOut )
-    {
-        this.stdOut = stdOut;
-    }
-
-    @Override
-    public void consumeLine( String line )
-    {
-        stdOut.handleLine( line );
-    }
-}
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
index e1932a9..214c9dc 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
@@ -23,44 +23,53 @@ import org.apache.maven.surefire.extensions.CloseableDaemonThread;
 import org.apache.maven.surefire.extensions.CommandReader;
 import org.apache.maven.surefire.extensions.ForkChannel;
 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
+import org.apache.maven.surefire.extensions.util.LineConsumerThread;
+import org.apache.maven.surefire.extensions.util.StreamFeeder;
 import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 
 import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketOption;
-import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.Channel;
 import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import static java.net.StandardSocketOptions.SO_KEEPALIVE;
 import static java.net.StandardSocketOptions.SO_REUSEADDR;
 import static java.net.StandardSocketOptions.TCP_NODELAY;
-import static java.nio.channels.AsynchronousChannelGroup.withThreadPool;
-import static java.nio.channels.AsynchronousServerSocketChannel.open;
-import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
+import static java.nio.channels.ServerSocketChannel.open;
 
 /**
  *
  */
 final class SurefireForkChannel extends ForkChannel
 {
-    private final ExecutorService executorService;
-    private final AsynchronousServerSocketChannel server;
+    private final ServerSocketChannel server;
     private final int serverPort;
+    private SocketChannel channel;
 
     SurefireForkChannel( int forkChannelId ) throws IOException
     {
         super( forkChannelId );
-        executorService = Executors.newCachedThreadPool( newDaemonThreadFactory() );
-        server = open( withThreadPool( executorService ) );
+        server = open();
         setTrueOptions( SO_REUSEADDR, TCP_NODELAY, SO_KEEPALIVE );
         server.bind( new InetSocketAddress( 0 ) );
         serverPort = ( (InetSocketAddress) server.getLocalAddress() ).getPort();
     }
 
+    @Override
+    public void openChannel() throws IOException
+    {
+        if ( channel != null )
+        {
+            throw new IllegalStateException( "already accepted TCP client connection" );
+        }
+        channel = server.accept();
+    }
+
     @SafeVarargs
     private final void setTrueOptions( SocketOption<Boolean>... options ) throws IOException
     {
@@ -92,12 +101,6 @@ final class SurefireForkChannel extends ForkChannel
     }
 
     @Override
-    public boolean useStdErr()
-    {
-        return false;
-    }
-
-    @Override
     public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands,
                                                     @Nonnull WritableByteChannel stdIn )
     {
@@ -105,9 +108,9 @@ final class SurefireForkChannel extends ForkChannel
     }
 
     @Override
-    public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands ) throws IOException
+    public CloseableDaemonThread bindCommandReader( @Nonnull CommandReader commands )
     {
-        return null;
+        return new StreamFeeder( "commands-fork-" + getForkChannelId(), channel, commands );
     }
 
     @Override
@@ -119,35 +122,19 @@ final class SurefireForkChannel extends ForkChannel
     }
 
     @Override
-    public CloseableDaemonThread bindStdOutConsumer( @Nonnull StreamConsumer consumer ) throws IOException
+    public CloseableDaemonThread bindEventHandler( @Nonnull StreamConsumer consumer )
     {
-        return null;
-    }
-
-    @Override
-    public CloseableDaemonThread bindStdErrConsumer( @Nonnull StreamConsumer consumer,
-                                                     @Nonnull ReadableByteChannel stdErr,
-                                                     @Nonnull CountdownCloseable countdownCloseable )
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public CloseableDaemonThread bindStdErrConsumer( @Nonnull StreamConsumer consumer ) throws IOException
-    {
-        return null;
+        CountdownCloseable countdownCloseable = new CountdownCloseable( null, 0 );
+        return new LineConsumerThread( "events-fork-" + getForkChannelId(), channel, consumer, countdownCloseable );
     }
 
     @Override
     public void close() throws IOException
     {
-        try
-        {
-            server.close();
-        }
-        finally
+        //noinspection EmptyTryBlock
+        try ( Channel c1 = channel; Channel c2 = server )
         {
-            executorService.shutdownNow();
+            // only close all channels
         }
     }
 }
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
index 21cee75..14a28fe 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
@@ -51,6 +51,7 @@ import org.apache.maven.plugin.surefire.util.DirectoryScannerTest;
 import org.apache.maven.plugin.surefire.util.ScannerUtilTest;
 import org.apache.maven.plugin.surefire.util.SpecificFileFilterTest;
 import org.apache.maven.surefire.extensions.ConsoleOutputReporterTest;
+import org.apache.maven.surefire.extensions.ForkChannelTest;
 import org.apache.maven.surefire.extensions.StatelessReporterTest;
 import org.apache.maven.surefire.extensions.StatelessTestsetInfoReporterTest;
 import org.apache.maven.surefire.report.FileReporterTest;
@@ -104,6 +105,7 @@ public class JUnit4SuiteTest extends TestCase
         suite.addTest( new JUnit4TestAdapter( StatelessTestsetInfoReporterTest.class ) );
         suite.addTest( new JUnit4TestAdapter( CommonReflectorTest.class ) );
         suite.addTest( new JUnit4TestAdapter( ForkStarterTest.class ) );
+        suite.addTest( new JUnit4TestAdapter( ForkChannelTest.class ) );
         return suite;
     }
 }
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
new file mode 100644
index 0000000..e8b0dcf
--- /dev/null
+++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/extensions/ForkChannelTest.java
@@ -0,0 +1,132 @@
+package org.apache.maven.surefire.extensions;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream;
+import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream.TestLessInputStreamBuilder;
+import org.apache.maven.plugin.surefire.extensions.SurefireForkNodeFactory;
+import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.Socket;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.nio.charset.StandardCharsets.US_ASCII;
+import static org.fest.assertions.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ *
+ */
+@RunWith( PowerMockRunner.class )
+@PowerMockIgnore( { "org.jacoco.agent.rt.*", "com.vladium.emma.rt.*" } )
+public class ForkChannelTest
+{
+    @Mock
+    private StreamConsumer consumer;
+
+    @Test( timeout = 30_000L )
+    public void test() throws Exception
+    {
+        ForkNodeFactory factory = new SurefireForkNodeFactory();
+        ForkChannel channel = factory.createForkChannel( 1 );
+
+        assertThat( channel.getForkChannelId() ).isEqualTo( 1 );
+        assertThat( channel.useStdIn() ).isFalse();
+        assertThat( channel.useStdOut() ).isFalse();
+        assertThat( channel.getForkNodeConnectionString() )
+            .startsWith( "tcp://127.0.0.1:" )
+            .isNotEqualTo( "tcp://127.0.0.1:" );
+
+        URI uri = new URI( channel.getForkNodeConnectionString() );
+        assertThat( uri.getPort() ).isPositive();
+
+        ArgumentCaptor<String> line = ArgumentCaptor.forClass( String.class );
+        doNothing().when( consumer ).consumeLine( anyString() );
+
+        Client client = new Client( uri.getPort() );
+        final AtomicBoolean hasError = new AtomicBoolean();
+        client.setUncaughtExceptionHandler( new UncaughtExceptionHandler()
+        {
+            @Override
+            public void uncaughtException( Thread t, Throwable e )
+            {
+                hasError.set( true );
+                e.printStackTrace( System.err );
+            }
+        } );
+        client.start();
+
+        channel.openChannel();
+        TimeUnit.SECONDS.sleep( 3L );
+
+        TestLessInputStreamBuilder builder = new TestLessInputStreamBuilder();
+        TestLessInputStream commandReader = builder.build();
+        commandReader.noop();
+        channel.bindCommandReader( commandReader ).start();
+        channel.bindEventHandler( consumer ).start();
+
+        client.join();
+
+        assertThat( hasError.get() ).isFalse();
+
+        verify( consumer, times( 1 ) ).consumeLine( line.capture() );
+        assertThat( line.getValue() ).isEqualTo( "Hi There!" );
+    }
+
+    private static class Client extends Thread
+    {
+        private final int port;
+
+        private Client( int port )
+        {
+            this.port = port;
+        }
+
+        @Override
+        public void run()
+        {
+            try ( Socket socket = new Socket( "127.0.0.1", port ) )
+            {
+                byte[] data = new byte[128];
+                int readLength = socket.getInputStream().read( data );
+                String token = new String( data, 0, readLength, US_ASCII );
+                assertThat( token ).isEqualTo( ":maven-surefire-std-out:noop:" );
+                socket.getOutputStream().write( "Hi There!".getBytes( US_ASCII ) );
+            }
+            catch ( IOException e )
+            {
+                throw new IllegalStateException( e );
+            }
+        }
+    }
+}
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
index da19dfc..5d9fbba 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/ForkChannel.java
@@ -29,7 +29,6 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 
 /**
- * The constructor prepares I/O or throws {@link IOException}.
  */
 public abstract class ForkChannel implements Closeable
 {
@@ -44,6 +43,8 @@ public abstract class ForkChannel implements Closeable
         this.forkChannelId = forkChannelId;
     }
 
+    public abstract void openChannel() throws IOException;
+
     /**
      *
      * @return
@@ -64,12 +65,6 @@ public abstract class ForkChannel implements Closeable
 
     /**
      *
-     * @return
-     */
-    public abstract boolean useStdErr();
-
-    /**
-     *
      * @param commands
      * @param stdIn
      * @return
@@ -107,29 +102,7 @@ public abstract class ForkChannel implements Closeable
      * @return
      * @throws IOException
      */
-    public abstract CloseableDaemonThread bindStdOutConsumer( @Nonnull StreamConsumer consumer )
-        throws IOException;
-
-    /**
-     *
-     * @param consumer
-     * @param stdErr
-     * @param countdownCloseable
-     * @return
-     * @throws IOException
-     */
-    public abstract CloseableDaemonThread bindStdErrConsumer( @Nonnull StreamConsumer consumer,
-                                                              @Nonnull ReadableByteChannel stdErr,
-                                                              @Nonnull CountdownCloseable countdownCloseable )
-        throws IOException;
-
-    /**
-     *
-     * @param consumer
-     * @return
-     * @throws IOException
-     */
-    public abstract CloseableDaemonThread bindStdErrConsumer( @Nonnull StreamConsumer consumer )
+    public abstract CloseableDaemonThread bindEventHandler( @Nonnull StreamConsumer consumer )
         throws IOException;
 
     /**
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountdownCloseable.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountdownCloseable.java
index 9818ec9..4bb5272 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountdownCloseable.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CountdownCloseable.java
@@ -20,7 +20,6 @@ package org.apache.maven.surefire.extensions.util;
  */
 
 import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
 
@@ -33,8 +32,12 @@ public final class CountdownCloseable
     private final Closeable closeable;
     private volatile int countdown;
 
-    public CountdownCloseable( @Nonnull Closeable closeable, @Nonnegative int countdown )
+    public CountdownCloseable( Closeable closeable, @Nonnegative int countdown )
     {
+        if ( closeable == null && countdown > 0 )
+        {
+            throw new IllegalStateException( "closeable is null and countdown is " + countdown );
+        }
         this.closeable = closeable;
         this.countdown = countdown;
     }