You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by sl...@apache.org on 2010/06/24 21:22:31 UTC

svn commit: r957691 - in /camel/trunk/components/camel-stream/src: main/java/org/apache/camel/component/stream/StreamConsumer.java test/java/org/apache/camel/component/stream/ScanStreamFileTest.java

Author: slewis
Date: Thu Jun 24 19:22:31 2010
New Revision: 957691

URL: http://svn.apache.org/viewvc?rev=957691&view=rev
Log:
CAMEL-2853 - camel-stream - tailing logfile does not seem to work when logfile is rolled over

Modified:
    camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
    camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java

Modified: camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java?rev=957691&r1=957690&r2=957691&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java (original)
+++ camel/trunk/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java Thu Jun 24 19:22:31 2010
@@ -64,13 +64,7 @@ public class StreamConsumer extends Defa
     protected void doStart() throws Exception {
         super.doStart();
 
-        if ("in".equals(uri)) {
-            inputStream = System.in;
-        } else if ("file".equals(uri)) {
-            inputStream = resolveStreamFromFile();
-        } else if ("url".equals(uri)) {
-            inputStream = resolveStreamFromUrl();
-        }
+        initializeStream();
 
         executor = endpoint.getCamelContext().getExecutorServiceStrategy().newSingleThreadExecutor(this, endpoint.getEndpointUri());
         executor.execute(this);
@@ -95,10 +89,22 @@ public class StreamConsumer extends Defa
         }
     }
 
-    private void readFromStream() throws Exception {
+    private BufferedReader initializeStream() throws Exception {
+        if ("in".equals(uri)) {
+            inputStream = System.in;
+        } else if ("file".equals(uri)) {
+            inputStream = resolveStreamFromFile();
+        } else if ("url".equals(uri)) {
+            inputStream = resolveStreamFromUrl();
+        }
         Charset charset = endpoint.getCharset();
         BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, charset));
+        return br;
+    }
+
+    private void readFromStream() throws Exception {
         String line;
+        BufferedReader br = initializeStream();
 
         if (endpoint.isScanStream()) {
             // repeat scanning from stream
@@ -110,6 +116,9 @@ public class StreamConsumer extends Defa
                 boolean eos = line == null;
                 if (!eos && isRunAllowed()) {
                     processLine(line);
+                } else if (eos && isRunAllowed()) {
+                    //try and re-open stream
+                    br = initializeStream();
                 }
                 try {
                     Thread.sleep(endpoint.getScanStreamDelay());

Modified: camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java?rev=957691&r1=957690&r2=957691&view=diff
==============================================================================
--- camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java (original)
+++ camel/trunk/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java Thu Jun 24 19:22:31 2010
@@ -60,6 +60,37 @@ public class ScanStreamFileTest extends 
         fos.close();
     }
 
+    @Test
+    public void testScanRefreshedFile() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello", "there", "World", "!");
+
+        FileOutputStream fos = refreshFile(null);
+        fos.write("Hello\n".getBytes());
+        Thread.sleep(150);
+        fos.write("there\n".getBytes());
+        fos = refreshFile(fos);
+        Thread.sleep(150);
+        fos.write("World\n".getBytes());
+        Thread.sleep(150);
+        fos = refreshFile(fos);
+        Thread.sleep(150);
+        fos.write("!\n".getBytes());
+
+        assertMockEndpointsSatisfied();
+
+        fos.close();
+    }
+
+    private FileOutputStream refreshFile(FileOutputStream fos) throws Exception {
+        if (fos != null) {
+            fos.close();
+        }
+        file.delete();
+        file.createNewFile();
+        return new FileOutputStream(file);
+    }
+
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
@@ -68,4 +99,4 @@ public class ScanStreamFileTest extends 
         };
     }
 
-}
\ No newline at end of file
+}