You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/07 11:05:08 UTC
svn commit: r1211363 - in /camel/trunk/components/camel-stream/src:
main/java/org/apache/camel/component/stream/StreamConsumer.java
test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
Author: davsclaus
Date: Wed Dec 7 10:05:08 2011
New Revision: 1211363
URL: http://svn.apache.org/viewvc?rev=1211363&view=rev
Log:
CAMEL-4748: Ensure old stream is closed when re-initializing or closing consumer
Modified:
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=1211363&r1=1211362&r2=1211363&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Wed Dec 7 10:05:08 2011
@@ -36,6 +36,7 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +51,8 @@ public class StreamConsumer extends Defa
private static final String INVALID_URI = "Invalid uri, valid form: 'stream:{" + TYPES + "}'";
private static final List<String> TYPES_LIST = Arrays.asList(TYPES.split(","));
private ExecutorService executor;
- private InputStream inputStream = System.in;
+ private volatile InputStream inputStream = System.in;
+ private volatile InputStream inputStreamToClose;
private StreamEndpoint endpoint;
private String uri;
private boolean initialPromptDone;
@@ -86,6 +88,9 @@ public class StreamConsumer extends Defa
executor = null;
}
lines.clear();
+
+ // do not close regular inputStream as it may be System.in etc.
+ IOHelper.close(inputStreamToClose);
super.doStop();
}
@@ -98,16 +103,21 @@ public class StreamConsumer extends Defa
}
private BufferedReader initializeStream() throws Exception {
+ // close old stream, before obtaining a new stream
+ IOHelper.close(inputStreamToClose);
+
if ("in".equals(uri)) {
inputStream = System.in;
+ inputStreamToClose = null;
} else if ("file".equals(uri)) {
inputStream = resolveStreamFromFile();
+ inputStreamToClose = inputStream;
} else if ("url".equals(uri)) {
inputStream = resolveStreamFromUrl();
+ inputStreamToClose = inputStream;
}
Charset charset = endpoint.getCharset();
- BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset));
- return br;
+ return new BufferedReader(new InputStreamReader(inputStream, charset));
}
private void readFromStream() throws Exception {
Modified: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=1211363&r1=1211362&r2=1211363&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (original)
+++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Wed Dec 7 10:05:08 2011
@@ -51,10 +51,13 @@ public class ScanStreamFileTest extends
mock.expectedMinimumMessageCount(2);
FileOutputStream fos = new FileOutputStream(file);
- fos.write("Hello\n".getBytes());
- Thread.sleep(150);
- fos.write("World\n".getBytes());
- fos.close();
+ try {
+ fos.write("Hello\n".getBytes());
+ Thread.sleep(150);
+ fos.write("World\n".getBytes());
+ } finally {
+ fos.close();
+ }
assertMockEndpointsSatisfied();
}
@@ -65,17 +68,20 @@ public class ScanStreamFileTest extends
mock.expectedMinimumMessageCount(3);
FileOutputStream fos = refreshFile(null);
- fos.write("Hello\n".getBytes());
- Thread.sleep(150);
- fos = refreshFile(fos);
- fos.write("there\n".getBytes());
- Thread.sleep(150);
- fos = refreshFile(fos);
- fos.write("World\n".getBytes());
- Thread.sleep(150);
- fos = refreshFile(fos);
- fos.write("!\n".getBytes());
- fos.close();
+ try {
+ fos.write("Hello\n".getBytes());
+ Thread.sleep(150);
+ fos = refreshFile(fos);
+ fos.write("there\n".getBytes());
+ Thread.sleep(150);
+ fos = refreshFile(fos);
+ fos.write("World\n".getBytes());
+ Thread.sleep(150);
+ fos = refreshFile(fos);
+ fos.write("!\n".getBytes());
+ } finally {
+ fos.close();
+ }
assertMockEndpointsSatisfied();
}