You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/07 03:16:04 UTC

flume git commit: FLUME-1934. Spooling Directory Source dies on encountering zero-byte files.

Repository: flume
Updated Branches:
  refs/heads/trunk 4e06f6fe7 -> 4d2a34e93


FLUME-1934. Spooling Directory Source dies on encountering zero-byte files.

(Grant Henke via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d2a34e9
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d2a34e9
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d2a34e9

Branch: refs/heads/trunk
Commit: 4d2a34e931554baa1c1b255d95540a46354a521f
Parents: 4e06f6f
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Apr 6 18:14:04 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Apr 6 18:14:04 2015 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   |  6 ++--
 .../flume/source/TestSpoolDirectorySource.java  | 30 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/4d2a34e9/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 27e9c1e..d54f415 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -252,8 +252,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     List<Event> events = des.readEvents(numEvents);
 
     /* It's possible that the last read took us just up to a file boundary.
-     * If so, try to roll to the next file, if there is one. */
-    if (events.isEmpty()) {
+     * If so, try to roll to the next file, if there is one.
+     * Loop until events is not empty or there is no next file in case of 0 byte files */
+    while (events.isEmpty()) {
+      logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
       retireCurrentFile();
       currentFile = getNextFile();
       if (!currentFile.isPresent()) {

http://git-wip-us.apache.org/repos/asf/flume/blob/4d2a34e9/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 89e7c8c..fe530ff 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -283,4 +283,34 @@ public class TestSpoolDirectorySource {
     Assert.assertEquals(8, dataOut.size());
     source.stop();
   }
+
+  @Test
+  public void testEndWithZeroByteFiles() throws IOException, InterruptedException {
+    Context context = new Context();
+
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\n", f1, Charsets.UTF_8);
+
+    File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+    File f3 = new File(tmpDir.getAbsolutePath() + "/file3");
+    File f4 = new File(tmpDir.getAbsolutePath() + "/file4");
+
+    Files.touch(f2);
+    Files.touch(f3);
+    Files.touch(f4);
+
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+      tmpDir.getAbsolutePath());
+    Configurables.configure(source, context);
+    source.start();
+
+    // Need better way to ensure all files were processed.
+    Thread.sleep(5000);
+
+    Assert.assertFalse("Server did not error", source.hasFatalError());
+    Assert.assertEquals("One message was read", 1,
+      source.getSourceCounter().getEventAcceptedCount());
+    source.stop();
+  }
 }