You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/04/07 03:16:04 UTC
flume git commit: FLUME-1934. Spooling Directory Source dies on
encountering zero-byte files.
Repository: flume
Updated Branches:
refs/heads/trunk 4e06f6fe7 -> 4d2a34e93
FLUME-1934. Spooling Directory Source dies on encountering zero-byte files.
(Grant Henke via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d2a34e9
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d2a34e9
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d2a34e9
Branch: refs/heads/trunk
Commit: 4d2a34e931554baa1c1b255d95540a46354a521f
Parents: 4e06f6f
Author: Hari Shreedharan <hs...@apache.org>
Authored: Mon Apr 6 18:14:04 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Mon Apr 6 18:14:04 2015 -0700
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 6 ++--
.../flume/source/TestSpoolDirectorySource.java | 30 ++++++++++++++++++++
2 files changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/4d2a34e9/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 27e9c1e..d54f415 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -252,8 +252,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
List<Event> events = des.readEvents(numEvents);
/* It's possible that the last read took us just up to a file boundary.
- * If so, try to roll to the next file, if there is one. */
- if (events.isEmpty()) {
+ * If so, try to roll to the next file, if there is one.
+ * Loop until events is not empty or there is no next file in case of 0 byte files */
+ while (events.isEmpty()) {
+ logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
retireCurrentFile();
currentFile = getNextFile();
if (!currentFile.isPresent()) {
http://git-wip-us.apache.org/repos/asf/flume/blob/4d2a34e9/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 89e7c8c..fe530ff 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -283,4 +283,34 @@ public class TestSpoolDirectorySource {
Assert.assertEquals(8, dataOut.size());
source.stop();
}
+
+ @Test
+ public void testEndWithZeroByteFiles() throws IOException, InterruptedException {
+ Context context = new Context();
+
+ File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+ Files.write("file1line1\n", f1, Charsets.UTF_8);
+
+ File f2 = new File(tmpDir.getAbsolutePath() + "/file2");
+ File f3 = new File(tmpDir.getAbsolutePath() + "/file3");
+ File f4 = new File(tmpDir.getAbsolutePath() + "/file4");
+
+ Files.touch(f2);
+ Files.touch(f3);
+ Files.touch(f4);
+
+ context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+ tmpDir.getAbsolutePath());
+ Configurables.configure(source, context);
+ source.start();
+
+ // Need better way to ensure all files were processed.
+ Thread.sleep(5000);
+
+ Assert.assertFalse("Server did not error", source.hasFatalError());
+ Assert.assertEquals("One message was read", 1,
+ source.getSourceCounter().getEventAcceptedCount());
+ source.stop();
+ }
}