You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2020/04/08 08:22:13 UTC

[maven-surefire] 05/18: fix after Enrico's findings in external project

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch maven2surefire-jvm-communication
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit ff5b7c6926c302598b49488a0a727fdfbdb7949d
Author: tibordigana <ti...@apache.org>
AuthorDate: Sun Mar 22 01:47:34 2020 +0100

    fix after Enrico's findings in external project
---
 .../plugin/surefire/booterclient/ForkStarter.java  |   4 +-
 .../AbstractNoninterruptibleReadableChannel.java   |  69 ++++++
 .../AbstractNoninterruptibleWritableChannel.java   |  93 ++++++++
 .../maven/surefire/util/internal/Channels.java     | 122 ++++++++++
 .../java/org/apache/maven/JUnit4SuiteTest.java     |   6 +-
 .../surefire/util/internal/ChannelsReaderTest.java | 250 +++++++++++++++++++++
 .../surefire/util/internal/ChannelsWriterTest.java | 171 ++++++++++++++
 .../apache/maven/surefire/booter/ForkedBooter.java |   3 +
 .../spi/LegacyMasterProcessChannelEncoder.java     |  36 +--
 ...LegacyMasterProcessChannelProcessorFactory.java |   5 +-
 .../extensions/util/CommandlineStreams.java        |   6 +-
 .../util/FlushableWritableByteChannel.java         |  68 ------
 12 files changed, 742 insertions(+), 91 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
index a41384a..b27dacb 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/ForkStarter.java
@@ -637,9 +637,9 @@ public class ForkStarter
             out = forkChannel.bindEventHandler( eventConsumer, countdownCloseable, streams.getStdOutChannel() );
             out.start();
 
-            EventHandler<String> stdErrConsumer = new NativeStdErrStreamConsumer( reporter );
+            EventHandler<String> errConsumer = new NativeStdErrStreamConsumer( reporter );
             err = new LineConsumerThread( "fork-" + forkNumber + "-err-thread-", streams.getStdErrChannel(),
-                stdErrConsumer, countdownCloseable );
+                errConsumer, countdownCloseable );
             err.start();
 
             result = exec.awaitExit();
diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleReadableChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleReadableChannel.java
new file mode 100644
index 0000000..1a75972
--- /dev/null
+++ b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleReadableChannel.java
@@ -0,0 +1,69 @@
+package org.apache.maven.surefire.util.internal;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonReadableChannelException;
+import java.nio.channels.ReadableByteChannel;
+
+/**
+ * The channel used for reads which cannot be implicitly closed after the operational Thread
+ * is {@link Thread#isInterrupted() interrupted}.
+ *
+ * @since 3.0.0-M5
+ */
+abstract class AbstractNoninterruptibleReadableChannel implements ReadableByteChannel
+{
+    private volatile boolean open = true;
+
+    protected abstract int readImpl( ByteBuffer src ) throws IOException;
+    protected abstract void closeImpl() throws IOException;
+
+    @Override
+    public final int read( ByteBuffer src ) throws IOException
+    {
+        if ( !isOpen() )
+        {
+            throw new ClosedChannelException();
+        }
+
+        if ( !src.hasArray() || src.isReadOnly() )
+        {
+            throw new NonReadableChannelException();
+        }
+
+        return src.hasRemaining() ? readImpl( src ) : 0;
+    }
+
+    @Override
+    public final boolean isOpen()
+    {
+        return open;
+    }
+
+    @Override
+    public final void close() throws IOException
+    {
+        open = false;
+        closeImpl();
+    }
+}
diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleWritableChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleWritableChannel.java
new file mode 100644
index 0000000..cb08e34
--- /dev/null
+++ b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/AbstractNoninterruptibleWritableChannel.java
@@ -0,0 +1,93 @@
+package org.apache.maven.surefire.util.internal;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * The channel used for writes which cannot be implicitly closed after the operational Thread
+ * is {@link Thread#isInterrupted() interrupted}.
+ *
+ * @since 3.0.0-M5
+ */
+abstract class AbstractNoninterruptibleWritableChannel implements WritableByteChannel, Flushable
+{
+    private final boolean flushable;
+    private volatile boolean open = true;
+
+    AbstractNoninterruptibleWritableChannel( boolean flushable )
+    {
+        this.flushable = flushable;
+    }
+
+    protected abstract void writeImpl( ByteBuffer src ) throws IOException;
+    protected abstract void closeImpl() throws IOException;
+
+    @Override
+    public final synchronized int write( ByteBuffer src ) throws IOException
+    {
+        if ( !isOpen() )
+        {
+            throw new ClosedChannelException();
+        }
+
+        if ( !src.hasArray() || src.isReadOnly() )
+        {
+            throw new NonWritableChannelException();
+        }
+
+        if ( src.remaining() != src.capacity() )
+        {
+            src.flip();
+        }
+
+        int countWrittenBytes = 0;
+
+        if ( src.hasRemaining() )
+        {
+            countWrittenBytes = src.remaining();
+            writeImpl( src );
+            src.position( src.limit() );
+            if ( flushable )
+            {
+                flush();
+            }
+        }
+        return countWrittenBytes;
+    }
+
+    @Override
+    public final boolean isOpen()
+    {
+        return open;
+    }
+
+    @Override
+    public final void close() throws IOException
+    {
+        open = false;
+        closeImpl();
+    }
+}
diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/Channels.java b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/Channels.java
new file mode 100644
index 0000000..65cb9b4
--- /dev/null
+++ b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/Channels.java
@@ -0,0 +1,122 @@
+package org.apache.maven.surefire.util.internal;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import javax.annotation.Nonnull;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Converts {@link OutputStream}, {@link java.io.PrintStream}, {@link InputStream}
+ * to the Java {@link java.nio.channels.Channel}.
+ * <br>
+ * We do not use the Java's utility class {@link java.nio.channels.Channels} because the utility
+ * closes the stream as soon as the particular Thread is interrupted.
+ * If the frameworks (Zookeeper, Netty) interrupts the thread, the communication channels become
+ * closed and the JVM hangs. Therefore we developed internal utility which is safe for the Surefire.
+ *
+ * @since 3.0.0-M5
+ */
+public final class Channels
+{
+    private Channels()
+    {
+        throw new IllegalStateException( "no instantiable constructor" );
+    }
+
+    public static WritableByteChannel newChannel( @Nonnull  OutputStream out )
+    {
+        return newChannel( out, false );
+    }
+
+    public static WritableByteChannel newFlushableChannel( @Nonnull OutputStream out )
+    {
+        return newChannel( out, true );
+    }
+
+    public static ReadableByteChannel newChannel( @Nonnull final InputStream is )
+    {
+        requireNonNull( is, "the stream should not be null" );
+
+        if ( is instanceof FileInputStream && FileInputStream.class.equals( is.getClass() ) )
+        {
+            return ( (FileInputStream) is ).getChannel();
+        }
+
+        return new AbstractNoninterruptibleReadableChannel()
+        {
+            @Override
+            protected int readImpl( ByteBuffer src ) throws IOException
+            {
+                int count = is.read( src.array(), src.arrayOffset() + src.position(), src.remaining() );
+                if ( count > 0 )
+                {
+                    src.position( count + src.position() );
+                }
+                return count;
+            }
+
+            @Override
+            protected void closeImpl() throws IOException
+            {
+                is.close();
+            }
+        };
+    }
+
+    private static WritableByteChannel newChannel( @Nonnull final OutputStream out, final boolean flushable )
+    {
+        requireNonNull( out, "the stream should not be null" );
+
+        if ( out instanceof FileOutputStream && FileOutputStream.class.equals( out.getClass() ) )
+        {
+            return ( (FileOutputStream) out ).getChannel();
+        }
+
+        return new AbstractNoninterruptibleWritableChannel( flushable )
+        {
+            @Override
+            protected void writeImpl( ByteBuffer src ) throws IOException
+            {
+                out.write( src.array(), src.arrayOffset() + src.position(), src.remaining() );
+            }
+
+            @Override
+            protected void closeImpl() throws IOException
+            {
+                out.close();
+            }
+
+            @Override
+            public void flush() throws IOException
+            {
+                out.flush();
+            }
+        };
+    }
+}
diff --git a/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java b/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java
index 66a95a6..650807f 100644
--- a/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java
+++ b/surefire-api/src/test/java/org/apache/maven/JUnit4SuiteTest.java
@@ -35,6 +35,8 @@ import org.apache.maven.surefire.util.RunOrderCalculatorTest;
 import org.apache.maven.surefire.util.RunOrderTest;
 import org.apache.maven.surefire.util.ScanResultTest;
 import org.apache.maven.surefire.util.TestsToRunTest;
+import org.apache.maven.surefire.util.internal.ChannelsReaderTest;
+import org.apache.maven.surefire.util.internal.ChannelsWriterTest;
 import org.apache.maven.surefire.util.internal.ConcurrencyUtilsTest;
 import org.apache.maven.surefire.util.internal.ImmutableMapTest;
 import org.junit.runner.RunWith;
@@ -62,7 +64,9 @@ import org.junit.runners.Suite;
     SpecificTestClassFilterTest.class,
     FundamentalFilterTest.class,
     ImmutableMapTest.class,
-    ReflectionUtilsTest.class
+    ReflectionUtilsTest.class,
+    ChannelsReaderTest.class,
+    ChannelsWriterTest.class
 } )
 @RunWith( Suite.class )
 public class JUnit4SuiteTest
diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsReaderTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsReaderTest.java
new file mode 100644
index 0000000..7e3a685
--- /dev/null
+++ b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsReaderTest.java
@@ -0,0 +1,250 @@
+package org.apache.maven.surefire.util.internal;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonReadableChannelException;
+import java.nio.channels.ReadableByteChannel;
+
+import static java.nio.file.Files.write;
+import static org.fest.assertions.Assertions.assertThat;
+
+/**
+ * The tests for {@link Channels#newChannel(InputStream)}.
+ */
+public class ChannelsReaderTest
+{
+    @Rule
+    public final ExpectedException ee = ExpectedException.none();
+
+    @Rule
+    public final TemporaryFolder tmp = TemporaryFolder.builder()
+        .assureDeletion()
+        .build();
+
+    @Test
+    public void exactBufferSize() throws Exception
+    {
+        ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} );
+        ReadableByteChannel channel = Channels.newChannel( is );
+        ByteBuffer bb = ByteBuffer.allocate( 3 );
+
+        int countWritten = channel.read( bb );
+
+        assertThat( countWritten )
+            .isEqualTo( 3 );
+
+        assertThat( bb.arrayOffset() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.position() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.remaining() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 3 );
+
+        bb.flip();
+
+        assertThat( bb.arrayOffset() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.position() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.remaining() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.array() )
+            .isEqualTo( new byte[] {1, 2, 3} );
+
+        assertThat( channel.isOpen() )
+            .isTrue();
+
+        channel.close();
+
+        assertThat( channel.isOpen() )
+            .isFalse();
+    }
+
+    @Test
+    public void biggerBuffer() throws Exception
+    {
+        ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} );
+        ReadableByteChannel channel = Channels.newChannel( is );
+        ByteBuffer bb = ByteBuffer.allocate( 4 );
+
+        int countWritten = channel.read( bb );
+
+        assertThat( countWritten )
+            .isEqualTo( 3 );
+
+        assertThat( bb.arrayOffset() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.position() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.remaining() )
+            .isEqualTo( 1 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 4 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 4 );
+
+        bb.flip();
+
+        assertThat( bb.arrayOffset() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.position() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.remaining() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 4 );
+
+        assertThat( bb.array() )
+            .isEqualTo( new byte[] {1, 2, 3, 0} );
+
+        assertThat( channel.isOpen() )
+            .isTrue();
+
+        channel.close();
+
+        assertThat( channel.isOpen() )
+            .isFalse();
+    }
+
+    @Test
+    public void shouldFailAfterClosed() throws IOException
+    {
+        ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} );
+        ReadableByteChannel channel = Channels.newChannel( is );
+        channel.close();
+        assertThat( channel.isOpen() ).isFalse();
+        ee.expect( ClosedChannelException.class );
+        channel.read( ByteBuffer.allocate( 0 ) );
+    }
+
+    @Test
+    public void shouldFailIfNotReadable() throws IOException
+    {
+        ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} );
+        ReadableByteChannel channel = Channels.newChannel( is );
+        ee.expect( NonReadableChannelException.class );
+        channel.read( ByteBuffer.allocate( 0 ).asReadOnlyBuffer() );
+    }
+
+    @Test
+    public void shouldFailIOnDirectBuffer() throws IOException
+    {
+        ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} );
+        ReadableByteChannel channel = Channels.newChannel( is );
+        ee.expect( NonReadableChannelException.class );
+        channel.read( ByteBuffer.allocateDirect( 0 ) );
+    }
+
+    @Test
+    public void shouldUseFileChannel() throws IOException
+    {
+        File f = tmp.newFile();
+        write( f.toPath(), new byte[] {1, 2, 3} );
+        FileInputStream is = new FileInputStream( f );
+        ReadableByteChannel channel = Channels.newChannel( is );
+        ByteBuffer bb = ByteBuffer.allocate( 4 );
+        int countWritten = channel.read( bb );
+
+        assertThat( channel.isOpen() )
+            .isTrue();
+
+        channel.close();
+
+        assertThat( channel.isOpen() )
+            .isFalse();
+
+        assertThat( countWritten )
+            .isEqualTo( 3 );
+
+        assertThat( bb.arrayOffset() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.position() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.remaining() )
+            .isEqualTo( 1 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 4 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 4 );
+
+        bb.flip();
+
+        assertThat( bb.arrayOffset() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.position() )
+            .isEqualTo( 0 );
+
+        assertThat( bb.remaining() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 4 );
+
+        assertThat( bb.array() )
+            .isEqualTo( new byte[] {1, 2, 3, 0} );
+    }
+}
diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsWriterTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsWriterTest.java
new file mode 100644
index 0000000..a1f6a66
--- /dev/null
+++ b/surefire-api/src/test/java/org/apache/maven/surefire/util/internal/ChannelsWriterTest.java
@@ -0,0 +1,171 @@
+package org.apache.maven.surefire.util.internal;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.NonWritableChannelException;
+import java.nio.channels.WritableByteChannel;
+
+import static java.nio.file.Files.readAllBytes;
+import static org.fest.assertions.Assertions.assertThat;
+
+/**
+ * The tests for {@link Channels#newChannel(OutputStream)} and {@link Channels#newFlushableChannel(OutputStream)}.
+ */
+public class ChannelsWriterTest
+{
+    @Rule
+    public final ExpectedException ee = ExpectedException.none();
+
+    @Rule
+    public final TemporaryFolder tmp = TemporaryFolder.builder()
+        .assureDeletion()
+        .build();
+
+    @Test
+    public void wrappedBuffer() throws Exception
+    {
+        final boolean[] isFlush = {false};
+        ByteArrayOutputStream out = new ByteArrayOutputStream()
+        {
+            @Override
+            public void flush() throws IOException
+            {
+                isFlush[0] = true;
+                super.flush();
+            }
+        };
+        WritableByteChannel channel = Channels.newFlushableChannel( out );
+        ByteBuffer bb = ByteBuffer.wrap( new byte[] {1, 2, 3} );
+        int countWritten = channel.write( bb );
+        assertThat( countWritten )
+            .isEqualTo( 3 );
+
+        assertThat( out.toByteArray() )
+            .hasSize( 3 )
+            .isEqualTo( new byte[] {1, 2, 3} );
+
+        assertThat( isFlush )
+            .hasSize( 1 )
+            .containsOnly( true );
+
+        assertThat( bb.position() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 3 );
+
+        assertThat( channel.isOpen() )
+            .isTrue();
+    }
+
+    @Test
+    public void bigBuffer() throws Exception
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        WritableByteChannel channel = Channels.newChannel( out );
+        ByteBuffer bb = ByteBuffer.allocate( 4 );
+        bb.put( (byte) 1 );
+        bb.put( (byte) 2 );
+        bb.put( (byte) 3 );
+        int countWritten = channel.write( bb );
+        assertThat( countWritten ).isEqualTo( 3 );
+        assertThat( out.toByteArray() )
+            .hasSize( 3 )
+            .isEqualTo( new byte[] {1, 2, 3} );
+
+        assertThat( bb.position() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.limit() )
+            .isEqualTo( 3 );
+
+        assertThat( bb.capacity() )
+            .isEqualTo( 4 );
+
+        assertThat( channel.isOpen() )
+            .isTrue();
+    }
+
+    @Test
+    public void shouldFailAfterClosed() throws IOException
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        WritableByteChannel channel = Channels.newChannel( out );
+        channel.close();
+        assertThat( channel.isOpen() ).isFalse();
+        ee.expect( ClosedChannelException.class );
+        channel.write( ByteBuffer.allocate( 0 ) );
+    }
+
+    @Test
+    public void shouldFailIfNotReadable() throws IOException
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        WritableByteChannel channel = Channels.newChannel( out );
+        ee.expect( NonWritableChannelException.class );
+        channel.write( ByteBuffer.allocate( 0 ).asReadOnlyBuffer() );
+    }
+
+    @Test
+    public void shouldFailIOnDirectBuffer() throws IOException
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        WritableByteChannel channel = Channels.newChannel( out );
+        ee.expect( NonWritableChannelException.class );
+        channel.write( ByteBuffer.allocateDirect( 0 ) );
+    }
+
+    @Test
+    public void shouldUseFileChannel() throws IOException
+    {
+        File f = tmp.newFile();
+        FileOutputStream os = new FileOutputStream( f );
+        WritableByteChannel channel = Channels.newChannel( os );
+        ByteBuffer bb = ByteBuffer.wrap( new byte[] {1, 2, 3} );
+        channel.write( bb );
+
+        assertThat( channel.isOpen() )
+            .isTrue();
+
+        channel.close();
+
+        assertThat( channel.isOpen() )
+            .isFalse();
+
+        assertThat( readAllBytes( f.toPath() ) )
+            .hasSize( 3 )
+            .isEqualTo( new byte[] {1, 2, 3} );
+    }
+}
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java
index 34b752e..46d9d74 100644
--- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/ForkedBooter.java
@@ -382,6 +382,9 @@ public final class ForkedBooter
 
     private void acknowledgedExit()
     {
+        //noinspection ResultOfMethodCallIgnored
+        Thread.interrupted();
+
         commandReader.addByeAckListener( new CommandListener()
                                           {
                                               @Override
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java
index a9bd41a..a85e4ef 100644
--- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoder.java
@@ -289,23 +289,27 @@ public class LegacyMasterProcessChannelEncoder implements MasterProcessChannelEn
 
     private void encodeAndPrintEvent( StringBuilder event )
     {
-        byte[] array = event.append( '\n' ).toString().getBytes( STREAM_ENCODING );
-        synchronized ( out )
+        try
         {
-            try
-            {
-                out.write( ByteBuffer.wrap( array ) );
-            }
-            catch ( ClosedChannelException e )
-            {
-                DumpErrorSingleton.getSingleton()
-                    .dumpText( "Channel closed while writing the event '" + event + "'." );
-            }
-            catch ( IOException e )
-            {
-                DumpErrorSingleton.getSingleton().dumpException( e );
-                trouble = true;
-            }
+            //noinspection ResultOfMethodCallIgnored
+            Thread.interrupted();
+
+            byte[] array = event.append( '\n' )
+                .toString()
+                .getBytes( STREAM_ENCODING );
+
+            out.write( ByteBuffer.wrap( array ) );
+        }
+        catch ( ClosedChannelException e )
+        {
+            DumpErrorSingleton.getSingleton()
+                .dumpException( e, "Channel closed while writing the event '" + event + "'." );
+        }
+        catch ( IOException e )
+        {
+            DumpErrorSingleton.getSingleton()
+                .dumpException( e );
+            trouble = true;
         }
     }
 
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java
index c6e11da..5713b99 100644
--- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java
@@ -26,7 +26,8 @@ import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory;
 import java.io.IOException;
 import java.net.MalformedURLException;
 
-import static java.nio.channels.Channels.newChannel;
+import static org.apache.maven.surefire.util.internal.Channels.newChannel;
+import static org.apache.maven.surefire.util.internal.Channels.newFlushableChannel;
 
 /**
  * Producer of encoder and decoder for process pipes.
@@ -62,7 +63,7 @@ public class LegacyMasterProcessChannelProcessorFactory
     @Override
     public MasterProcessChannelEncoder createEncoder()
     {
-        return new LegacyMasterProcessChannelEncoder( newChannel( System.out ) );
+        return new LegacyMasterProcessChannelEncoder( newFlushableChannel( System.out ) );
     }
 
     @Override
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java
index 43ec328..18dae45 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/CommandlineStreams.java
@@ -27,8 +27,8 @@ import java.nio.channels.Channel;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 
-import static java.nio.channels.Channels.newChannel;
-import static org.apache.maven.surefire.extensions.util.FlushableWritableByteChannel.newFlushableChannel;
+import static org.apache.maven.surefire.util.internal.Channels.newChannel;
+import static org.apache.maven.surefire.util.internal.Channels.newFlushableChannel;
 
 /**
  *
@@ -44,8 +44,10 @@ public final class CommandlineStreams implements Closeable
     {
         InputStream stdOutStream = process.getInputStream();
         stdOutChannel = newChannel( stdOutStream );
+
         InputStream stdErrStream = process.getErrorStream();
         stdErrChannel = newChannel( stdErrStream );
+
         stdInChannel = newFlushableChannel( process.getOutputStream() );
     }
 
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/FlushableWritableByteChannel.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/FlushableWritableByteChannel.java
deleted file mode 100644
index e4112f2..0000000
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/util/FlushableWritableByteChannel.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.maven.surefire.extensions.util;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-import static java.nio.channels.Channels.newChannel;
-
-/**
- *
- */
-final class FlushableWritableByteChannel implements WritableByteChannel
-{
-    private final OutputStream os;
-    private final WritableByteChannel channel;
-
-    private FlushableWritableByteChannel( @Nonnull OutputStream os )
-    {
-        this.os = os;
-        this.channel = newChannel( os );
-    }
-
-    static synchronized WritableByteChannel newFlushableChannel( OutputStream os )
-    {
-        return new FlushableWritableByteChannel( os );
-    }
-
-    @Override
-    public int write( ByteBuffer src ) throws IOException
-    {
-        int countWrittenBytes = channel.write( src );
-        os.flush();
-        return countWrittenBytes;
-    }
-
-    @Override
-    public boolean isOpen()
-    {
-        return channel.isOpen();
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-        channel.close();
-    }
-}