You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by sl...@apache.org on 2010/06/24 21:22:31 UTC
svn commit: r957691 - in /camel/trunk/components/camel-stream/src:
main/java/org/apache/camel/component/stream/StreamConsumer.java
test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
Author: slewis
Date: Thu Jun 24 19:22:31 2010
New Revision: 957691
URL: http://svn.apache.org/viewvc?rev=957691&view=rev
Log:
CAMEL-2853 - camel-stream - tailing logfile does not seem to work when logfile is rolled over
Modified:
camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=957691&r1=957690&r2=957691&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Thu Jun 24 19:22:31 2010
@@ -64,13 +64,7 @@ public class StreamConsumer extends Defa
protected void doStart() throws Exception {
super.doStart();
- if ("in".equals(uri)) {
- inputStream = System.in;
- } else if ("file".equals(uri)) {
- inputStream = resolveStreamFromFile();
- } else if ("url".equals(uri)) {
- inputStream = resolveStreamFromUrl();
- }
+ initializeStream();
executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, endpoint.getEndpointUri());
executor.execute(this);
@@ -95,10 +89,22 @@ public class StreamConsumer extends Defa
}
}
- private void readFromStream() throws Exception {
+ private BufferedReader initializeStream() throws Exception {
+ if ("in".equals(uri)) {
+ inputStream = System.in;
+ } else if ("file".equals(uri)) {
+ inputStream = resolveStreamFromFile();
+ } else if ("url".equals(uri)) {
+ inputStream = resolveStreamFromUrl();
+ }
Charset charset = endpoint.getCharset();
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset));
+ return br;
+ }
+
+ private void readFromStream() throws Exception {
String line;
+ BufferedReader br = initializeStream();
if (endpoint.isScanStream()) {
// repeat scanning from stream
@@ -110,6 +116,9 @@ public class StreamConsumer extends Defa
boolean eos = line == null;
if (!eos && isRunAllowed()) {
processLine(line);
+ } else if (eos && isRunAllowed()) {
+ //try and re-open stream
+ br = initializeStream();
}
try {
Thread.sleep(endpoint.getScanStreamDelay());
Modified: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=957691&r1=957690&r2=957691&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (original)
+++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Thu Jun 24 19:22:31 2010
@@ -60,6 +60,37 @@ public class ScanStreamFileTest extends
fos.close();
}
+ @Test
+ public void testScanRefreshedFile() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello", "there", "World", "!");
+
+ FileOutputStream fos = refreshFile(null);
+ fos.write("Hello\n".getBytes());
+ Thread.sleep(150);
+ fos.write("there\n".getBytes());
+ fos = refreshFile(fos);
+ Thread.sleep(150);
+ fos.write("World\n".getBytes());
+ Thread.sleep(150);
+ fos = refreshFile(fos);
+ Thread.sleep(150);
+ fos.write("!\n".getBytes());
+
+ assertMockEndpointsSatisfied();
+
+ fos.close();
+ }
+
+ private FileOutputStream refreshFile(FileOutputStream fos) throws Exception {
+ if (fos != null) {
+ fos.close();
+ }
+ file.delete();
+ file.createNewFile();
+ return new FileOutputStream(file);
+ }
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
@@ -68,4 +99,4 @@ public class ScanStreamFileTest extends
};
}
-}
\ No newline at end of file
+}