You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by sj...@apache.org on 2022/07/27 18:19:56 UTC

[maven-shared-utils] branch MSHARED-1072 created (now 756cd5b)

This is an automated email from the ASF dual-hosted git repository.

sjaranowski pushed a change to branch MSHARED-1072
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git


      at 756cd5b  [MSHARED-1072] fix blocking in StreamFeeder

This branch includes the following new commits:

     new 756cd5b  [MSHARED-1072] fix blocking in StreamFeeder

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[maven-shared-utils] 01/01: [MSHARED-1072] fix blocking in StreamFeeder

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjaranowski pushed a commit to branch MSHARED-1072
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git

commit 756cd5b8c1c16930c2eee81c7fcd144fd0cd3270
Author: Slawomir Jaranowski <s....@gmail.com>
AuthorDate: Wed Jul 27 20:00:36 2022 +0200

    [MSHARED-1072] fix blocking in StreamFeeder
    
    If input stream has no more available data
    StreamFeeder was block forever
---
 .../utils/cli/CommandLineTimeOutException.java     |  14 ++-
 .../maven/shared/utils/cli/CommandLineUtils.java   |  87 +++--------------
 .../maven/shared/utils/cli/StreamFeeder.java       | 106 ++++++++-------------
 .../maven/shared/utils/cli/StreamFeederTest.java   |  91 ++++++++++++++++++
 4 files changed, 153 insertions(+), 145 deletions(-)

diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
index c1f8209..620fb3f 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
@@ -20,16 +20,15 @@ package org.apache.maven.shared.utils.cli;
  */
 
 /**
+ * Report a timeout for executing process.
+ *
  * @author Olivier Lamy
- * 
+ *
  */
 public class CommandLineTimeOutException
     extends CommandLineException
 {
 
-    /**
-     * 
-     */
     private static final long serialVersionUID = 7322428741683224481L;
 
     /**
@@ -41,4 +40,11 @@ public class CommandLineTimeOutException
         super( message, cause );
     }
 
+    /**
+     * @param message The message of the exception.
+     */
+    public CommandLineTimeOutException( String message )
+    {
+        super( message );
+    }
 }
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
index 2b17245..baaf869 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
@@ -27,6 +27,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -49,7 +50,7 @@ public abstract class CommandLineUtils
         implements StreamConsumer
     {
 
-        private final StringBuffer string = new StringBuffer();
+        private final StringBuilder string = new StringBuilder();
 
         private static final String LS = System.getProperty( "line.separator", "\n" );
 
@@ -72,16 +73,6 @@ public abstract class CommandLineUtils
 
     }
 
-    /**
-     * Number of milliseconds per second.
-     */
-    private static final long MILLIS_PER_SECOND = 1000L;
-
-    /**
-     * Number of nanoseconds per second.
-     */
-    private static final long NANOS_PER_SECOND = 1000000000L;
-
     /**
      * @param cl The command line {@link Commandline}
      * @param systemOut {@link StreamConsumer}
@@ -288,31 +279,13 @@ public abstract class CommandLineUtils
                     errorPumper.setName( "StreamPumper-systemErr" );
                     errorPumper.start();
 
-                    int returnValue;
-                    if ( timeoutInSeconds <= 0 )
+                    if ( timeoutInSeconds > 0 && !p.waitFor( timeoutInSeconds, TimeUnit.SECONDS ) )
                     {
-                        returnValue = p.waitFor();
+                        throw new CommandLineTimeOutException(
+                            String.format( "Process timed out after %d seconds.", timeoutInSeconds ) );
                     }
-                    else
-                    {
-                        final long now = System.nanoTime();
-                        final long timeout = now + NANOS_PER_SECOND * timeoutInSeconds;
-                        while ( isAlive( p ) && ( System.nanoTime() < timeout ) )
-                        {
-                            // The timeout is specified in seconds. Therefore we must not sleep longer than one second
-                            // but we should sleep as long as possible to reduce the number of iterations performed.
-                            Thread.sleep( MILLIS_PER_SECOND - 1L );
-                        }
 
-                        if ( isAlive( p ) )
-                        {
-                            throw new InterruptedException( String.format( "Process timed out after %d seconds.",
-                                                                           timeoutInSeconds ) );
-
-                        }
-
-                        returnValue = p.exitValue();
-                    }
+                    int returnValue = p.waitFor();
 
 // TODO Find out if waitUntilDone needs to be called using a try-finally construct. The method may throw an
 //      InterruptedException so that calls to waitUntilDone may be skipped.
@@ -342,14 +315,9 @@ public abstract class CommandLineUtils
                     outputPumper.waitUntilDone();
                     errorPumper.waitUntilDone();
 
-                    if ( inputFeeder != null )
+                    if ( inputFeeder != null && inputFeeder.getException() != null )
                     {
-                        inputFeeder.close();
-
-                        if ( inputFeeder.getException() != null )
-                        {
-                            throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() );
-                        }
+                        throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() );
                     }
 
                     if ( outputPumper.getException() != null )
@@ -366,16 +334,12 @@ public abstract class CommandLineUtils
                 }
                 catch ( InterruptedException ex )
                 {
+                    Thread.currentThread().interrupt();
                     throw new CommandLineTimeOutException( "Error while executing external command, process killed.",
                                                            ex );
-
                 }
                 finally
                 {
-                    if ( inputFeeder != null )
-                    {
-                        inputFeeder.disable();
-                    }
                     if ( outputPumper != null )
                     {
                         outputPumper.disable();
@@ -395,18 +359,7 @@ public abstract class CommandLineUtils
                     finally
                     {
                         ShutdownHookUtils.removeShutdownHook( processHook );
-
-                        try
-                        {
-                            processHook.run();
-                        }
-                        finally
-                        {
-                            if ( inputFeeder != null )
-                            {
-                                inputFeeder.close();
-                            }
-                        }
+                        processHook.run();
                     }
                 }
             }
@@ -444,24 +397,6 @@ public abstract class CommandLineUtils
         return ensureCaseSensitivity( envs, caseSensitive );
     }
 
-    private static boolean isAlive( Process p )
-    {
-        if ( p == null )
-        {
-            return false;
-        }
-
-        try
-        {
-            p.exitValue();
-            return false;
-        }
-        catch ( IllegalThreadStateException e )
-        {
-            return true;
-        }
-    }
-
     /**
      * @param toProcess The command line to translate.
      * @return The array of translated parts.
@@ -482,7 +417,7 @@ public abstract class CommandLineUtils
         boolean inEscape = false;
         int state = normal;
         final StringTokenizer tok = new StringTokenizer( toProcess, "\"\' \\", true );
-        List<String> tokens = new ArrayList<String>();
+        List<String> tokens = new ArrayList<>();
         StringBuilder current = new StringBuilder();
 
         while ( tok.hasMoreTokens() )
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
index 6f6723c..18e15d3 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
@@ -22,7 +22,7 @@ package org.apache.maven.shared.utils.cli;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Objects;
 
 /**
  * Read from an InputStream and write the output to an OutputStream.
@@ -30,86 +30,70 @@ import java.util.concurrent.atomic.AtomicReference;
  * @author <a href="mailto:trygvis@inamo.no">Trygve Laugst&oslash;l</a>
  */
 class StreamFeeder
-    extends AbstractStreamHandler
+    extends Thread
 {
 
-    private final AtomicReference<InputStream> input;
+    private final InputStream input;
 
-    private final AtomicReference<OutputStream> output;
+    private final OutputStream output;
 
-    private volatile Throwable exception;
+    private Throwable exception;
+    private boolean done;
+
+    private final Object lock = new Object();
 
     /**
      * Create a new StreamFeeder
      *
-     * @param input Stream to read from
+     * @param input  Stream to read from
      * @param output Stream to write to
      */
     StreamFeeder( InputStream input, OutputStream output )
     {
-        super();
-        this.input = new AtomicReference<InputStream>( input );
-        this.output = new AtomicReference<OutputStream>( output );
+        this.input = Objects.requireNonNull( input );
+        this.output = Objects.requireNonNull( output );
+        this.done = false;
     }
 
     @Override
+    @SuppressWarnings( "checkstyle:innerassignment" )
     public void run()
     {
         try
         {
-            feed();
-        }
-        catch ( Throwable e )
-        {
-            // Catch everything so the streams will be closed and flagged as done.
-            if ( this.exception != null )
+            for ( int data; !isInterrupted() && ( data = input.read() ) != -1; )
             {
-                this.exception = e;
+                output.write( data );
             }
+            output.flush();
+        }
+        catch ( IOException e )
+        {
+            exception = e;
         }
         finally
         {
             close();
+        }
 
-            synchronized ( this )
-            {
-                notifyAll();
-            }
+        synchronized ( lock )
+        {
+            done = true;
+            lock.notifyAll();
         }
     }
 
-    public void close()
+    private void close()
     {
-        setDone();
-        final InputStream is = input.getAndSet( null );
-        if ( is != null )
+        try
         {
-            try
-            {
-                is.close();
-            }
-            catch ( IOException ex )
-            {
-                if ( this.exception != null )
-                {
-                    this.exception = ex;
-                }
-            }
+            output.close();
         }
-
-        final OutputStream os = output.getAndSet( null );
-        if ( os != null )
+        catch ( IOException e )
         {
-            try
-            {
-                os.close();
-            }
-            catch ( IOException ex )
+            if ( exception == null )
             {
-                if ( this.exception != null )
-                {
-                    this.exception = ex;
-                }
+                exception = e;
             }
         }
     }
@@ -122,30 +106,22 @@ class StreamFeeder
         return this.exception;
     }
 
-    @SuppressWarnings( "checkstyle:innerassignment" )
-    private void feed()
-        throws IOException
+    public void waitUntilDone()
     {
-        InputStream is = input.get();
-        OutputStream os = output.get();
-        boolean flush = false;
-
-        if ( is != null && os != null )
+        interrupt();
+        synchronized ( lock )
         {
-            for ( int data; !isDone() && ( data = is.read() ) != -1; )
+            while ( !done )
             {
-                if ( !isDisabled() )
+                try
                 {
-                    os.write( data );
-                    flush = true;
+                    lock.wait();
+                }
+                catch ( InterruptedException e )
+                {
+                    Thread.currentThread().interrupt();
                 }
-            }
-
-            if ( flush )
-            {
-                os.flush();
             }
         }
     }
-
 }
diff --git a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
new file mode 100644
index 0000000..c0a015c
--- /dev/null
+++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
@@ -0,0 +1,91 @@
+package org.apache.maven.shared.utils.cli;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamFeederTest
+{
+    static class BlockingInputStream extends ByteArrayInputStream
+    {
+        boolean endStream = false;
+        final Object lock = new Object();
+
+        public BlockingInputStream( byte[] buf )
+        {
+            super( buf );
+        }
+
+        @Override
+        public synchronized int read()
+        {
+            int data = super.read();
+            if ( data >= 0 )
+            {
+                return data;
+            }
+
+            // end test data ... block
+            endStream = true;
+
+            try
+            {
+                wait();
+            }
+            catch ( InterruptedException e )
+            {
+                Thread.currentThread().interrupt();
+            }
+            return -1;
+        }
+
+        public synchronized void waitForEndStream() throws InterruptedException
+        {
+            while ( !endStream )
+            {
+                wait( 100 );
+            }
+        }
+    }
+
+    @Test
+    public void waitUntilFeederDone() throws InterruptedException
+    {
+
+        BlockingInputStream inputStream = new BlockingInputStream( "TestData".getBytes() );
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        StreamFeeder streamFeeder = new StreamFeeder( inputStream, outputStream );
+
+        streamFeeder.start();
+
+        // wait until input stream will be in block mode
+        inputStream.waitForEndStream();
+
+        streamFeeder.waitUntilDone(); // wait until process finish
+
+        assertEquals( "TestData", outputStream.toString() );
+    }
+}