You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by sj...@apache.org on 2022/07/27 18:19:57 UTC

[maven-shared-utils] 01/01: [MSHARED-1072] fix blocking in StreamFeeder

This is an automated email from the ASF dual-hosted git repository.

sjaranowski pushed a commit to branch MSHARED-1072
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git

commit 756cd5b8c1c16930c2eee81c7fcd144fd0cd3270
Author: Slawomir Jaranowski <s....@gmail.com>
AuthorDate: Wed Jul 27 20:00:36 2022 +0200

    [MSHARED-1072] fix blocking in StreamFeeder
    
    If input stream has no more available data
    StreamFeeder was block forever
---
 .../utils/cli/CommandLineTimeOutException.java     |  14 ++-
 .../maven/shared/utils/cli/CommandLineUtils.java   |  87 +++--------------
 .../maven/shared/utils/cli/StreamFeeder.java       | 106 ++++++++-------------
 .../maven/shared/utils/cli/StreamFeederTest.java   |  91 ++++++++++++++++++
 4 files changed, 153 insertions(+), 145 deletions(-)

diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
index c1f8209..620fb3f 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
@@ -20,16 +20,15 @@ package org.apache.maven.shared.utils.cli;
  */
 
 /**
+ * Report a timeout for executing process.
+ *
  * @author Olivier Lamy
- * 
+ *
  */
 public class CommandLineTimeOutException
     extends CommandLineException
 {
 
-    /**
-     * 
-     */
     private static final long serialVersionUID = 7322428741683224481L;
 
     /**
@@ -41,4 +40,11 @@ public class CommandLineTimeOutException
         super( message, cause );
     }
 
+    /**
+     * @param message The message of the exception.
+     */
+    public CommandLineTimeOutException( String message )
+    {
+        super( message );
+    }
 }
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
index 2b17245..baaf869 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
@@ -27,6 +27,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -49,7 +50,7 @@ public abstract class CommandLineUtils
         implements StreamConsumer
     {
 
-        private final StringBuffer string = new StringBuffer();
+        private final StringBuilder string = new StringBuilder();
 
         private static final String LS = System.getProperty( "line.separator", "\n" );
 
@@ -72,16 +73,6 @@ public abstract class CommandLineUtils
 
     }
 
-    /**
-     * Number of milliseconds per second.
-     */
-    private static final long MILLIS_PER_SECOND = 1000L;
-
-    /**
-     * Number of nanoseconds per second.
-     */
-    private static final long NANOS_PER_SECOND = 1000000000L;
-
     /**
      * @param cl The command line {@link Commandline}
      * @param systemOut {@link StreamConsumer}
@@ -288,31 +279,13 @@ public abstract class CommandLineUtils
                     errorPumper.setName( "StreamPumper-systemErr" );
                     errorPumper.start();
 
-                    int returnValue;
-                    if ( timeoutInSeconds <= 0 )
+                    if ( timeoutInSeconds > 0 && !p.waitFor( timeoutInSeconds, TimeUnit.SECONDS ) )
                     {
-                        returnValue = p.waitFor();
+                        throw new CommandLineTimeOutException(
+                            String.format( "Process timed out after %d seconds.", timeoutInSeconds ) );
                     }
-                    else
-                    {
-                        final long now = System.nanoTime();
-                        final long timeout = now + NANOS_PER_SECOND * timeoutInSeconds;
-                        while ( isAlive( p ) && ( System.nanoTime() < timeout ) )
-                        {
-                            // The timeout is specified in seconds. Therefore we must not sleep longer than one second
-                            // but we should sleep as long as possible to reduce the number of iterations performed.
-                            Thread.sleep( MILLIS_PER_SECOND - 1L );
-                        }
 
-                        if ( isAlive( p ) )
-                        {
-                            throw new InterruptedException( String.format( "Process timed out after %d seconds.",
-                                                                           timeoutInSeconds ) );
-
-                        }
-
-                        returnValue = p.exitValue();
-                    }
+                    int returnValue = p.waitFor();
 
 // TODO Find out if waitUntilDone needs to be called using a try-finally construct. The method may throw an
 //      InterruptedException so that calls to waitUntilDone may be skipped.
@@ -342,14 +315,9 @@ public abstract class CommandLineUtils
                     outputPumper.waitUntilDone();
                     errorPumper.waitUntilDone();
 
-                    if ( inputFeeder != null )
+                    if ( inputFeeder != null && inputFeeder.getException() != null )
                     {
-                        inputFeeder.close();
-
-                        if ( inputFeeder.getException() != null )
-                        {
-                            throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() );
-                        }
+                        throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() );
                     }
 
                     if ( outputPumper.getException() != null )
@@ -366,16 +334,12 @@ public abstract class CommandLineUtils
                 }
                 catch ( InterruptedException ex )
                 {
+                    Thread.currentThread().interrupt();
                     throw new CommandLineTimeOutException( "Error while executing external command, process killed.",
                                                            ex );
-
                 }
                 finally
                 {
-                    if ( inputFeeder != null )
-                    {
-                        inputFeeder.disable();
-                    }
                     if ( outputPumper != null )
                     {
                         outputPumper.disable();
@@ -395,18 +359,7 @@ public abstract class CommandLineUtils
                     finally
                     {
                         ShutdownHookUtils.removeShutdownHook( processHook );
-
-                        try
-                        {
-                            processHook.run();
-                        }
-                        finally
-                        {
-                            if ( inputFeeder != null )
-                            {
-                                inputFeeder.close();
-                            }
-                        }
+                        processHook.run();
                     }
                 }
             }
@@ -444,24 +397,6 @@ public abstract class CommandLineUtils
         return ensureCaseSensitivity( envs, caseSensitive );
     }
 
-    private static boolean isAlive( Process p )
-    {
-        if ( p == null )
-        {
-            return false;
-        }
-
-        try
-        {
-            p.exitValue();
-            return false;
-        }
-        catch ( IllegalThreadStateException e )
-        {
-            return true;
-        }
-    }
-
     /**
      * @param toProcess The command line to translate.
      * @return The array of translated parts.
@@ -482,7 +417,7 @@ public abstract class CommandLineUtils
         boolean inEscape = false;
         int state = normal;
         final StringTokenizer tok = new StringTokenizer( toProcess, "\"\' \\", true );
-        List<String> tokens = new ArrayList<String>();
+        List<String> tokens = new ArrayList<>();
         StringBuilder current = new StringBuilder();
 
         while ( tok.hasMoreTokens() )
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
index 6f6723c..18e15d3 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
@@ -22,7 +22,7 @@ package org.apache.maven.shared.utils.cli;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Objects;
 
 /**
  * Read from an InputStream and write the output to an OutputStream.
@@ -30,86 +30,70 @@ import java.util.concurrent.atomic.AtomicReference;
  * @author <a href="mailto:trygvis@inamo.no">Trygve Laugst&oslash;l</a>
  */
 class StreamFeeder
-    extends AbstractStreamHandler
+    extends Thread
 {
 
-    private final AtomicReference<InputStream> input;
+    private final InputStream input;
 
-    private final AtomicReference<OutputStream> output;
+    private final OutputStream output;
 
-    private volatile Throwable exception;
+    private Throwable exception;
+    private boolean done;
+
+    private final Object lock = new Object();
 
     /**
      * Create a new StreamFeeder
      *
-     * @param input Stream to read from
+     * @param input  Stream to read from
      * @param output Stream to write to
      */
     StreamFeeder( InputStream input, OutputStream output )
     {
-        super();
-        this.input = new AtomicReference<InputStream>( input );
-        this.output = new AtomicReference<OutputStream>( output );
+        this.input = Objects.requireNonNull( input );
+        this.output = Objects.requireNonNull( output );
+        this.done = false;
     }
 
     @Override
+    @SuppressWarnings( "checkstyle:innerassignment" )
     public void run()
     {
         try
         {
-            feed();
-        }
-        catch ( Throwable e )
-        {
-            // Catch everything so the streams will be closed and flagged as done.
-            if ( this.exception != null )
+            for ( int data; !isInterrupted() && ( data = input.read() ) != -1; )
             {
-                this.exception = e;
+                output.write( data );
             }
+            output.flush();
+        }
+        catch ( IOException e )
+        {
+            exception = e;
         }
         finally
         {
             close();
+        }
 
-            synchronized ( this )
-            {
-                notifyAll();
-            }
+        synchronized ( lock )
+        {
+            done = true;
+            lock.notifyAll();
         }
     }
 
-    public void close()
+    private void close()
     {
-        setDone();
-        final InputStream is = input.getAndSet( null );
-        if ( is != null )
+        try
         {
-            try
-            {
-                is.close();
-            }
-            catch ( IOException ex )
-            {
-                if ( this.exception != null )
-                {
-                    this.exception = ex;
-                }
-            }
+            output.close();
         }
-
-        final OutputStream os = output.getAndSet( null );
-        if ( os != null )
+        catch ( IOException e )
         {
-            try
-            {
-                os.close();
-            }
-            catch ( IOException ex )
+            if ( exception == null )
             {
-                if ( this.exception != null )
-                {
-                    this.exception = ex;
-                }
+                exception = e;
             }
         }
     }
@@ -122,30 +106,22 @@ class StreamFeeder
         return this.exception;
     }
 
-    @SuppressWarnings( "checkstyle:innerassignment" )
-    private void feed()
-        throws IOException
+    public void waitUntilDone()
     {
-        InputStream is = input.get();
-        OutputStream os = output.get();
-        boolean flush = false;
-
-        if ( is != null && os != null )
+        interrupt();
+        synchronized ( lock )
         {
-            for ( int data; !isDone() && ( data = is.read() ) != -1; )
+            while ( !done )
             {
-                if ( !isDisabled() )
+                try
                 {
-                    os.write( data );
-                    flush = true;
+                    lock.wait();
+                }
+                catch ( InterruptedException e )
+                {
+                    Thread.currentThread().interrupt();
                 }
-            }
-
-            if ( flush )
-            {
-                os.flush();
             }
         }
     }
-
 }
diff --git a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
new file mode 100644
index 0000000..c0a015c
--- /dev/null
+++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
@@ -0,0 +1,91 @@
+package org.apache.maven.shared.utils.cli;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamFeederTest
+{
+    static class BlockingInputStream extends ByteArrayInputStream
+    {
+        boolean endStream = false;
+        final Object lock = new Object();
+
+        public BlockingInputStream( byte[] buf )
+        {
+            super( buf );
+        }
+
+        @Override
+        public synchronized int read()
+        {
+            int data = super.read();
+            if ( data >= 0 )
+            {
+                return data;
+            }
+
+            // end test data ... block
+            endStream = true;
+
+            try
+            {
+                wait();
+            }
+            catch ( InterruptedException e )
+            {
+                Thread.currentThread().interrupt();
+            }
+            return -1;
+        }
+
+        public synchronized void waitForEndStream() throws InterruptedException
+        {
+            while ( !endStream )
+            {
+                wait( 100 );
+            }
+        }
+    }
+
+    @Test
+    public void waitUntilFeederDone() throws InterruptedException
+    {
+
+        BlockingInputStream inputStream = new BlockingInputStream( "TestData".getBytes() );
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        StreamFeeder streamFeeder = new StreamFeeder( inputStream, outputStream );
+
+        streamFeeder.start();
+
+        // wait until input stream will be in block mode
+        inputStream.waitForEndStream();
+
+        streamFeeder.waitUntilDone(); // wait until process finish
+
+        assertEquals( "TestData", outputStream.toString() );
+    }
+}