You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/11/06 20:32:46 UTC
flume git commit: FLUME-2525. Handle a zero byte
.flumespool-main.meta file for the spooldir source.
Repository: flume
Updated Branches:
refs/heads/trunk 7e21ad36c -> efbf87fb6
FLUME-2525. Handle a zero byte .flumespool-main.meta file for the spooldir source.
(Johny Rufus via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/efbf87fb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/efbf87fb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/efbf87fb
Branch: refs/heads/trunk
Commit: efbf87fb6ddc0bbc736446a5a91cf6a83d34d2d4
Parents: 7e21ad3
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Nov 6 11:31:35 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Nov 6 11:31:35 2014 -0800
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 3 ++
.../TestReliableSpoolingFileEventReader.java | 29 ++++++++++++++++++++
2 files changed, 32 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/efbf87fb/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 1833076..27e9c1e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -199,6 +199,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
}
this.metaFile = new File(trackerDirectory, metaFileName);
+ if(metaFile.exists() && metaFile.length() == 0) {
+ deleteMetaFile();
+ }
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/flume/blob/efbf87fb/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index a6b2473..c6ff63e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -405,6 +405,35 @@ public class TestReliableSpoolingFileEventReader {
@Test public void testLargeNumberOfFilesRANDOM() throws IOException {
templateTestForLargeNumberOfFiles(ConsumeOrder.RANDOM, null, 1000);
}
+
+ @Test
+ public void testZeroByteTrackerFile() throws IOException {
+ String trackerDirPath =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
+ File trackerDir = new File(WORK_DIR, trackerDirPath);
+ if(!trackerDir.exists()) {
+ trackerDir.mkdir();
+ }
+ File trackerFile = new File(trackerDir, ReliableSpoolingFileEventReader.metaFileName);
+ if(trackerFile.exists()) {
+ trackerFile.delete();
+ }
+ trackerFile.createNewFile();
+
+ ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+ .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();
+ final int expectedLines = 1;
+ int seenLines = 0;
+ List<Event> events = reader.readEvents(10);
+ int numEvents = events.size();
+ if (numEvents > 0) {
+ seenLines += numEvents;
+ reader.commit();
+ }
+ // This line will fail, if the zero-byte tracker file has not been handled
+ Assert.assertEquals(expectedLines, seenLines);
+ }
+
private void templateTestForLargeNumberOfFiles(ConsumeOrder order,
Comparator<Long> comparator,
int N) throws IOException {