You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@maven.apache.org by sj...@apache.org on 2022/07/27 18:19:57 UTC
[maven-shared-utils] 01/01: [MSHARED-1072] fix blocking in StreamFeeder
This is an automated email from the ASF dual-hosted git repository.
sjaranowski pushed a commit to branch MSHARED-1072
in repository https://gitbox.apache.org/repos/asf/maven-shared-utils.git
commit 756cd5b8c1c16930c2eee81c7fcd144fd0cd3270
Author: Slawomir Jaranowski <s....@gmail.com>
AuthorDate: Wed Jul 27 20:00:36 2022 +0200
[MSHARED-1072] fix blocking in StreamFeeder
If input stream has no more available data
StreamFeeder was block forever
---
.../utils/cli/CommandLineTimeOutException.java | 14 ++-
.../maven/shared/utils/cli/CommandLineUtils.java | 87 +++--------------
.../maven/shared/utils/cli/StreamFeeder.java | 106 ++++++++-------------
.../maven/shared/utils/cli/StreamFeederTest.java | 91 ++++++++++++++++++
4 files changed, 153 insertions(+), 145 deletions(-)
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
index c1f8209..620fb3f 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineTimeOutException.java
@@ -20,16 +20,15 @@ package org.apache.maven.shared.utils.cli;
*/
/**
+ * Report a timeout for executing process.
+ *
* @author Olivier Lamy
- *
+ *
*/
public class CommandLineTimeOutException
extends CommandLineException
{
- /**
- *
- */
private static final long serialVersionUID = 7322428741683224481L;
/**
@@ -41,4 +40,11 @@ public class CommandLineTimeOutException
super( message, cause );
}
+ /**
+ * @param message The message of the exception.
+ */
+ public CommandLineTimeOutException( String message )
+ {
+ super( message );
+ }
}
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
index 2b17245..baaf869 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/CommandLineUtils.java
@@ -27,6 +27,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -49,7 +50,7 @@ public abstract class CommandLineUtils
implements StreamConsumer
{
- private final StringBuffer string = new StringBuffer();
+ private final StringBuilder string = new StringBuilder();
private static final String LS = System.getProperty( "line.separator", "\n" );
@@ -72,16 +73,6 @@ public abstract class CommandLineUtils
}
- /**
- * Number of milliseconds per second.
- */
- private static final long MILLIS_PER_SECOND = 1000L;
-
- /**
- * Number of nanoseconds per second.
- */
- private static final long NANOS_PER_SECOND = 1000000000L;
-
/**
* @param cl The command line {@link Commandline}
* @param systemOut {@link StreamConsumer}
@@ -288,31 +279,13 @@ public abstract class CommandLineUtils
errorPumper.setName( "StreamPumper-systemErr" );
errorPumper.start();
- int returnValue;
- if ( timeoutInSeconds <= 0 )
+ if ( timeoutInSeconds > 0 && !p.waitFor( timeoutInSeconds, TimeUnit.SECONDS ) )
{
- returnValue = p.waitFor();
+ throw new CommandLineTimeOutException(
+ String.format( "Process timed out after %d seconds.", timeoutInSeconds ) );
}
- else
- {
- final long now = System.nanoTime();
- final long timeout = now + NANOS_PER_SECOND * timeoutInSeconds;
- while ( isAlive( p ) && ( System.nanoTime() < timeout ) )
- {
- // The timeout is specified in seconds. Therefore we must not sleep longer than one second
- // but we should sleep as long as possible to reduce the number of iterations performed.
- Thread.sleep( MILLIS_PER_SECOND - 1L );
- }
- if ( isAlive( p ) )
- {
- throw new InterruptedException( String.format( "Process timed out after %d seconds.",
- timeoutInSeconds ) );
-
- }
-
- returnValue = p.exitValue();
- }
+ int returnValue = p.waitFor();
// TODO Find out if waitUntilDone needs to be called using a try-finally construct. The method may throw an
// InterruptedException so that calls to waitUntilDone may be skipped.
@@ -342,14 +315,9 @@ public abstract class CommandLineUtils
outputPumper.waitUntilDone();
errorPumper.waitUntilDone();
- if ( inputFeeder != null )
+ if ( inputFeeder != null && inputFeeder.getException() != null )
{
- inputFeeder.close();
-
- if ( inputFeeder.getException() != null )
- {
- throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() );
- }
+ throw new CommandLineException( "Failure processing stdin.", inputFeeder.getException() );
}
if ( outputPumper.getException() != null )
@@ -366,16 +334,12 @@ public abstract class CommandLineUtils
}
catch ( InterruptedException ex )
{
+ Thread.currentThread().interrupt();
throw new CommandLineTimeOutException( "Error while executing external command, process killed.",
ex );
-
}
finally
{
- if ( inputFeeder != null )
- {
- inputFeeder.disable();
- }
if ( outputPumper != null )
{
outputPumper.disable();
@@ -395,18 +359,7 @@ public abstract class CommandLineUtils
finally
{
ShutdownHookUtils.removeShutdownHook( processHook );
-
- try
- {
- processHook.run();
- }
- finally
- {
- if ( inputFeeder != null )
- {
- inputFeeder.close();
- }
- }
+ processHook.run();
}
}
}
@@ -444,24 +397,6 @@ public abstract class CommandLineUtils
return ensureCaseSensitivity( envs, caseSensitive );
}
- private static boolean isAlive( Process p )
- {
- if ( p == null )
- {
- return false;
- }
-
- try
- {
- p.exitValue();
- return false;
- }
- catch ( IllegalThreadStateException e )
- {
- return true;
- }
- }
-
/**
* @param toProcess The command line to translate.
* @return The array of translated parts.
@@ -482,7 +417,7 @@ public abstract class CommandLineUtils
boolean inEscape = false;
int state = normal;
final StringTokenizer tok = new StringTokenizer( toProcess, "\"\' \\", true );
- List<String> tokens = new ArrayList<String>();
+ List<String> tokens = new ArrayList<>();
StringBuilder current = new StringBuilder();
while ( tok.hasMoreTokens() )
diff --git a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
index 6f6723c..18e15d3 100644
--- a/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
+++ b/src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
@@ -22,7 +22,7 @@ package org.apache.maven.shared.utils.cli;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Objects;
/**
* Read from an InputStream and write the output to an OutputStream.
@@ -30,86 +30,70 @@ import java.util.concurrent.atomic.AtomicReference;
* @author <a href="mailto:trygvis@inamo.no">Trygve Laugstøl</a>
*/
class StreamFeeder
- extends AbstractStreamHandler
+ extends Thread
{
- private final AtomicReference<InputStream> input;
+ private final InputStream input;
- private final AtomicReference<OutputStream> output;
+ private final OutputStream output;
- private volatile Throwable exception;
+ private Throwable exception;
+ private boolean done;
+
+ private final Object lock = new Object();
/**
* Create a new StreamFeeder
*
- * @param input Stream to read from
+ * @param input Stream to read from
* @param output Stream to write to
*/
StreamFeeder( InputStream input, OutputStream output )
{
- super();
- this.input = new AtomicReference<InputStream>( input );
- this.output = new AtomicReference<OutputStream>( output );
+ this.input = Objects.requireNonNull( input );
+ this.output = Objects.requireNonNull( output );
+ this.done = false;
}
@Override
+ @SuppressWarnings( "checkstyle:innerassignment" )
public void run()
{
try
{
- feed();
- }
- catch ( Throwable e )
- {
- // Catch everything so the streams will be closed and flagged as done.
- if ( this.exception != null )
+ for ( int data; !isInterrupted() && ( data = input.read() ) != -1; )
{
- this.exception = e;
+ output.write( data );
}
+ output.flush();
+ }
+ catch ( IOException e )
+ {
+ exception = e;
}
finally
{
close();
+ }
- synchronized ( this )
- {
- notifyAll();
- }
+ synchronized ( lock )
+ {
+ done = true;
+ lock.notifyAll();
}
}
- public void close()
+ private void close()
{
- setDone();
- final InputStream is = input.getAndSet( null );
- if ( is != null )
+ try
{
- try
- {
- is.close();
- }
- catch ( IOException ex )
- {
- if ( this.exception != null )
- {
- this.exception = ex;
- }
- }
+ output.close();
}
-
- final OutputStream os = output.getAndSet( null );
- if ( os != null )
+ catch ( IOException e )
{
- try
- {
- os.close();
- }
- catch ( IOException ex )
+ if ( exception == null )
{
- if ( this.exception != null )
- {
- this.exception = ex;
- }
+ exception = e;
}
}
}
@@ -122,30 +106,22 @@ class StreamFeeder
return this.exception;
}
- @SuppressWarnings( "checkstyle:innerassignment" )
- private void feed()
- throws IOException
+ public void waitUntilDone()
{
- InputStream is = input.get();
- OutputStream os = output.get();
- boolean flush = false;
-
- if ( is != null && os != null )
+ interrupt();
+ synchronized ( lock )
{
- for ( int data; !isDone() && ( data = is.read() ) != -1; )
+ while ( !done )
{
- if ( !isDisabled() )
+ try
{
- os.write( data );
- flush = true;
+ lock.wait();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
}
- }
-
- if ( flush )
- {
- os.flush();
}
}
}
-
}
diff --git a/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
new file mode 100644
index 0000000..c0a015c
--- /dev/null
+++ b/src/test/java/org/apache/maven/shared/utils/cli/StreamFeederTest.java
@@ -0,0 +1,91 @@
+package org.apache.maven.shared.utils.cli;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StreamFeederTest
+{
+ static class BlockingInputStream extends ByteArrayInputStream
+ {
+ boolean endStream = false;
+ final Object lock = new Object();
+
+ public BlockingInputStream( byte[] buf )
+ {
+ super( buf );
+ }
+
+ @Override
+ public synchronized int read()
+ {
+ int data = super.read();
+ if ( data >= 0 )
+ {
+ return data;
+ }
+
+ // end test data ... block
+ endStream = true;
+
+ try
+ {
+ wait();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ return -1;
+ }
+
+ public synchronized void waitForEndStream() throws InterruptedException
+ {
+ while ( !endStream )
+ {
+ wait( 100 );
+ }
+ }
+ }
+
+ @Test
+ public void waitUntilFeederDone() throws InterruptedException
+ {
+
+ BlockingInputStream inputStream = new BlockingInputStream( "TestData".getBytes() );
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ StreamFeeder streamFeeder = new StreamFeeder( inputStream, outputStream );
+
+ streamFeeder.start();
+
+ // wait until input stream will be in block mode
+ inputStream.waitForEndStream();
+
+ streamFeeder.waitUntilDone(); // wait until process finish
+
+ assertEquals( "TestData", outputStream.toString() );
+ }
+}