You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by ti...@apache.org on 2019/11/09 02:18:46 UTC

[maven-surefire] branch maven2surefire-jvm-communication updated (a69ad60 -> f92816d)

This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a change to branch maven2surefire-jvm-communication
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git.


    from a69ad60  resolved the previous problems
     new 33bbb50  improved DaemonThreadFactory
     new 795bda5  small two changes
     new f92816d  TCP server

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../extensions/NetworkingProcessExecutor.java      | 180 ++++++++++++++++++---
 .../surefire/extensions/PipeProcessExecutor.java   |   2 +-
 .../surefire/extensions/SurefireForkChannel.java   |  33 +++-
 .../util/internal/DaemonThreadFactory.java         |  40 +----
 .../maven/surefire/extensions/CommandReader.java   |   4 +-
 5 files changed, 194 insertions(+), 65 deletions(-)


[maven-surefire] 03/03: TCP server

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch maven2surefire-jvm-communication
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit f92816de454102dc79cc81eb0ae1b69571c99f49
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Nov 9 03:18:18 2019 +0100

    TCP server
---
 .../extensions/NetworkingProcessExecutor.java      | 180 ++++++++++++++++++---
 .../surefire/extensions/SurefireForkChannel.java   |  33 +++-
 2 files changed, 185 insertions(+), 28 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/NetworkingProcessExecutor.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/NetworkingProcessExecutor.java
index d640e02..c5da33b 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/NetworkingProcessExecutor.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/NetworkingProcessExecutor.java
@@ -21,6 +21,8 @@ package org.apache.maven.plugin.surefire.extensions;
 
 import org.apache.maven.shared.utils.cli.CommandLineUtils;
 import org.apache.maven.shared.utils.cli.Commandline;
+import org.apache.maven.surefire.booter.Command;
+import org.apache.maven.surefire.booter.MasterProcessCommand;
 import org.apache.maven.surefire.extensions.CommandReader;
 import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.extensions.ExecutableCommandline;
@@ -28,9 +30,19 @@ import org.apache.maven.surefire.extensions.StdErrStreamLine;
 import org.apache.maven.surefire.extensions.StdOutStreamLine;
 
 import javax.annotation.Nonnull;
-import java.net.ServerSocket;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
+import java.util.Scanner;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
+import static java.nio.ByteBuffer.wrap;
 import static java.nio.charset.StandardCharsets.US_ASCII;
 
 /**
@@ -39,42 +51,168 @@ import static java.nio.charset.StandardCharsets.US_ASCII;
  */
 final class NetworkingProcessExecutor implements ExecutableCommandline
 {
-    private final ServerSocket ss;
+    private final AsynchronousServerSocketChannel server;
+    private final ExecutorService executorService;
 
-    NetworkingProcessExecutor( ServerSocket ss )
+    NetworkingProcessExecutor( AsynchronousServerSocketChannel server, ExecutorService executorService )
     {
-        this.ss = ss;
+        this.server = server;
+        this.executorService = executorService;
     }
 
     @Nonnull
     @Override
     public <T> Callable<Integer> executeCommandLineAsCallable( @Nonnull T cli,
-                                                               @Nonnull CommandReader commands,
-                                                               @Nonnull EventHandler events,
+                                                               @Nonnull final CommandReader commands,
+                                                               @Nonnull final EventHandler events,
                                                                StdOutStreamLine stdOut,
                                                                StdErrStreamLine stdErr,
                                                                @Nonnull Runnable runAfterProcessTermination )
             throws Exception
     {
-        /*
-        Call in Threads:
-
-        Socket s = ss.accept();
-
-        for ( Scanner scanner = new Scanner( s.getInputStream(), "ASCII" ); scanner.hasNextLine(); )
+        server.accept( null, new CompletionHandler<AsynchronousSocketChannel, Object>()
         {
-            events.handleEvent( scanner.nextLine() );
-        }
+            @Override
+            public void completed( final AsynchronousSocketChannel client, Object attachment )
+            {
+                executorService.submit( new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        InputStream is = toInputStream( client );
+                        try
+                        {
+                            for ( Scanner scanner = new Scanner( is, "ASCII" ); scanner.hasNextLine(); )
+                            {
+                                if ( scanner.ioException() != null )
+                                {
+                                    break;
+                                }
+                                events.handleEvent( scanner.nextLine() );
+                            }
+                        }
+                        catch ( IllegalStateException e )
+                        {
+                            // scanner and InputStream is closed
+                            try
+                            {
+                                client.close();
+                            }
+                            catch ( IOException ex )
+                            {
+                                // couldn't close the client channel
+                            }
+                        }
+                    }
+                } );
 
-        Command cmd = commands.readNextCommand();
-        if ( cmd != null )
-        {
-            MasterProcessCommand cmdType = cmd.getCommandType();
-            s.getOutputStream()
-                    .write( cmdType.hasDataType() ? cmdType.encode( cmd.getData() ) : cmdType.encode() );
-        }*/
+                executorService.submit( new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            for ( Command cmd; ( cmd = commands.readNextCommand() ) != null;  )
+                            {
+                                MasterProcessCommand cmdType = cmd.getCommandType();
+                                byte[] b = cmdType.hasDataType() ? cmdType.encode( cmd.getData() ) : cmdType.encode();
+                                ByteBuffer bb = wrap( b );
+                                int writtenBytesTotal = 0;
+                                do
+                                {
+                                    Future<Integer> writtenBytes = client.write( bb );
+                                    int writtenCount = writtenBytes.get();
+                                    writtenBytesTotal += writtenCount;
+                                }
+                                while ( writtenBytesTotal < bb.limit() );
+                            }
+                        }
+                        catch ( Exception e )
+                        {
+                            // finished stream or error
+                            try
+                            {
+                                client.close();
+                            }
+                            catch ( IOException ex )
+                            {
+                                // couldn't close the client channel
+                            }
+                        }
+                    }
+                } );
+            }
+
+            @Override
+            public void failed( Throwable exc, Object attachment )
+            {
+                // write to dump file
+                // close the server
+            }
+        } );
 
         return CommandLineUtils.executeCommandLineAsCallable( (Commandline) cli, null,
                 new StdOutAdapter( stdOut ), new StdErrAdapter( stdErr ), 0, runAfterProcessTermination, US_ASCII );
     }
+
+    private static InputStream toInputStream( final AsynchronousSocketChannel client )
+    {
+        return new InputStream()
+        {
+            private final ByteBuffer bb = ByteBuffer.allocate( 64 * 1024 );
+            private int read;
+            private boolean closed;
+
+            @Override
+            public int read() throws IOException
+            {
+                if ( closed )
+                {
+                    return -1;
+                }
+
+                try
+                {
+                    if ( read == 0 )
+                    {
+                        bb.clear();
+                        read = client.read( bb ).get();
+                        if ( read == 0 )
+                        {
+                            return -1;
+                        }
+                    }
+                    read--;
+                    return bb.get();
+                }
+                catch ( InterruptedException e )
+                {
+                    closed = true;
+                    return -1;
+                }
+                catch ( ExecutionException e )
+                {
+                    closed = true;
+                    Throwable cause = e.getCause();
+                    if ( cause instanceof IOException )
+                    {
+                        throw (IOException) cause;
+                    }
+                    else
+                    {
+                        return -1;
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+                closed = true;
+                super.close();
+            }
+        };
+    }
 }
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
index da35ac4..fe5e2a2 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/SurefireForkChannel.java
@@ -24,36 +24,55 @@ import org.apache.maven.surefire.extensions.ForkChannel;
 
 import javax.annotation.Nonnull;
 import java.io.IOException;
-import java.net.ServerSocket;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.nio.channels.AsynchronousChannelGroup.withThreadPool;
+import static java.nio.channels.AsynchronousServerSocketChannel.open;
+import static org.apache.maven.surefire.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
 
 /**
  *
  */
 final class SurefireForkChannel implements ForkChannel
 {
-    private final ServerSocket ss;
+    private final ExecutorService executorService;
+    private final AsynchronousServerSocketChannel server;
+    private final int serverPort;
 
     SurefireForkChannel() throws IOException
     {
-        ss = new ServerSocket( 0 );
+        executorService = Executors.newCachedThreadPool( newDaemonThreadFactory() );
+        server = open( withThreadPool( executorService ) )
+                .bind( new InetSocketAddress( 0 ) );
+        serverPort = ( (InetSocketAddress) server.getLocalAddress() ).getPort();
     }
 
     @Override
     public String getForkNodeConnectionString()
     {
-        return "tcp://127.0.0.1:" + ss.getLocalPort();
+        return "tcp://127.0.0.1:" + serverPort;
     }
 
     @Nonnull
     @Override
-    public ExecutableCommandline createExecutableCommandline() throws IOException
+    public ExecutableCommandline createExecutableCommandline()
     {
-        return new NetworkingProcessExecutor( ss );
+        return new NetworkingProcessExecutor( server, executorService );
     }
 
     @Override
     public void close() throws IOException
     {
-        ss.close();
+        try
+        {
+            server.close();
+        }
+        finally
+        {
+            executorService.shutdownNow();
+        }
     }
 }


[maven-surefire] 02/03: small two changes

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch maven2surefire-jvm-communication
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit 795bda5ce32bcc15b0eb93a5ca734f1f4964517b
Author: tibordigana <ti...@apache.org>
AuthorDate: Sat Nov 9 03:17:58 2019 +0100

    small two changes
---
 .../apache/maven/plugin/surefire/extensions/PipeProcessExecutor.java  | 2 +-
 .../main/java/org/apache/maven/surefire/extensions/CommandReader.java | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/PipeProcessExecutor.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/PipeProcessExecutor.java
index 562a350..1ef9494 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/PipeProcessExecutor.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/extensions/PipeProcessExecutor.java
@@ -126,7 +126,7 @@ final class PipeProcessExecutor
             }
 
             @SuppressWarnings( "checkstyle:magicnumber" )
-            int b =  currentBuffer[currentPos++] & 0xff;
+            int b = currentBuffer[currentPos++] & 0xff;
             if ( currentPos == currentBuffer.length )
             {
                 currentBuffer = null;
diff --git a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CommandReader.java b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CommandReader.java
index eb15d8e..8dd9d96 100644
--- a/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CommandReader.java
+++ b/surefire-extensions-api/src/main/java/org/apache/maven/surefire/extensions/CommandReader.java
@@ -33,9 +33,9 @@ public interface CommandReader
 {
 
     /**
-     * Waits for the next command and reads complete stream of encoded command.
+     * Waits for the next command and returns it.
      *
-     * @return encoded command, or null if closed
+     * @return the command, or null if closed
      */
     Command readNextCommand() throws IOException;
     void close();


[maven-surefire] 01/03: improved DaemonThreadFactory

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch maven2surefire-jvm-communication
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git

commit 33bbb5014a04b562ac0eaa1ead0439a09137af3f
Author: tibordigana <ti...@apache.org>
AuthorDate: Fri Nov 8 23:50:16 2019 +0100

    improved DaemonThreadFactory
---
 .../util/internal/DaemonThreadFactory.java         | 40 ++++------------------
 1 file changed, 6 insertions(+), 34 deletions(-)

diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/DaemonThreadFactory.java b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/DaemonThreadFactory.java
index 3610a4b..06ddc53 100644
--- a/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/DaemonThreadFactory.java
+++ b/surefire-api/src/main/java/org/apache/maven/surefire/util/internal/DaemonThreadFactory.java
@@ -19,8 +19,8 @@ package org.apache.maven.surefire.util.internal;
  * under the License.
  */
 
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Creates new daemon Thread.
@@ -28,29 +28,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 public final class DaemonThreadFactory
     implements ThreadFactory
 {
-    private static final AtomicInteger POOL_NUMBER = new AtomicInteger( 1 );
-
-    private final AtomicInteger threadNumber = new AtomicInteger( 1 );
-
-    private final ThreadGroup group;
-
-    private final String namePrefix;
+    private static final ThreadFactory DEFAULT_THREAD_FACTORY = Executors.defaultThreadFactory();
 
     private DaemonThreadFactory()
     {
-        SecurityManager s = System.getSecurityManager();
-        group = s != null ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
-        namePrefix = "pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
     }
 
     @Override
     public Thread newThread( Runnable r )
     {
-        Thread t = new Thread( group, r, namePrefix + threadNumber.getAndIncrement() );
-        if ( t.getPriority() != Thread.NORM_PRIORITY )
-        {
-            t.setPriority( Thread.NORM_PRIORITY );
-        }
+        Thread t = DEFAULT_THREAD_FACTORY.newThread( r );
         t.setDaemon( true );
         return t;
     }
@@ -71,34 +58,19 @@ public final class DaemonThreadFactory
 
     public static Thread newDaemonThread( Runnable r )
     {
-        SecurityManager s = System.getSecurityManager();
-        ThreadGroup group = s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
-        Thread t = new Thread( group, r );
-        if ( t.getPriority() != Thread.NORM_PRIORITY )
-        {
-            t.setPriority( Thread.NORM_PRIORITY );
-        }
-        t.setDaemon( true );
-        return t;
+        return new DaemonThreadFactory().newThread( r );
     }
 
     public static Thread newDaemonThread( Runnable r, String name )
     {
-        SecurityManager s = System.getSecurityManager();
-        ThreadGroup group = s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
-        Thread t = new Thread( group, r, name );
-        if ( t.getPriority() != Thread.NORM_PRIORITY )
-        {
-            t.setPriority( Thread.NORM_PRIORITY );
-        }
-        t.setDaemon( true );
+        Thread t = new DaemonThreadFactory().newThread( r );
+        t.setName( name );
         return t;
     }
 
     private static class NamedThreadFactory
         implements ThreadFactory
     {
-
         private final String name;
 
         private NamedThreadFactory( String name )