You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/03/28 23:56:31 UTC
git commit: FLUME-2350. Consume Order tests need to space out file
creation.
Repository: flume
Updated Branches:
refs/heads/trunk 61b9bcbb6 -> 62b383a00
FLUME-2350. Consume Order tests need to space out file creation.
(Muhammad Ehsan ul Haque via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/62b383a0
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/62b383a0
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/62b383a0
Branch: refs/heads/trunk
Commit: 62b383a00c3f678b0f504dc71bf36091ddd4067a
Parents: 61b9bcb
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Mar 28 15:55:25 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Mar 28 15:56:25 2014 -0700
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 25 +++-
.../TestReliableSpoolingFileEventReader.java | 132 +++++++++++++++----
2 files changed, 123 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/62b383a0/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 1818250..0bc3f23 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
@@ -429,21 +430,27 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
if (candidateFiles.isEmpty()) { // No matching file in spooling directory.
return Optional.absent();
}
-
+
File selectedFile = candidateFiles.get(0); // Select the first random file.
if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random.
return openFile(selectedFile);
} else if (consumeOrder == ConsumeOrder.YOUNGEST) {
for (File candidateFile: candidateFiles) {
- if (candidateFile.lastModified() >
- selectedFile.lastModified()) {
+ long compare = selectedFile.lastModified() -
+ candidateFile.lastModified();
+ if (compare == 0) { // ts is same pick smallest lexicographically.
+ selectedFile = smallerLexicographical(selectedFile, candidateFile);
+ } else if (compare < 0) { // candidate is younger (cand-ts > selec-ts)
selectedFile = candidateFile;
}
}
} else { // default order is OLDEST
for (File candidateFile: candidateFiles) {
- if (candidateFile.lastModified() <
- selectedFile.lastModified()) {
+ long compare = selectedFile.lastModified() -
+ candidateFile.lastModified();
+ if (compare == 0) { // ts is same pick smallest lexicographically.
+ selectedFile = smallerLexicographical(selectedFile, candidateFile);
+ } else if (compare > 0) { // candidate is older (cand-ts < selec-ts).
selectedFile = candidateFile;
}
}
@@ -451,7 +458,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
return openFile(selectedFile);
}
-
+
+ private File smallerLexicographical(File f1, File f2) {
+ if (f1.getName().compareTo(f2.getName()) < 0) {
+ return f1;
+ }
+ return f2;
+ }
/**
* Opens a file for consuming
* @param file
http://git-wip-us.apache.org/repos/asf/flume/blob/62b383a0/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 0b07e7a..6a02612 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -203,36 +203,42 @@ public class TestReliableSpoolingFileEventReader {
@Test
public void testConsumeFileRandomly() throws IOException {
- ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+ ReliableEventReader reader
+ = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(WORK_DIR)
.consumeOrder(ConsumeOrder.RANDOM)
.build();
File fileName = new File(WORK_DIR, "new-file");
- FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n");
- Set<String> actual = Sets.newHashSet();
+ FileUtils.write(fileName,
+ "New file created in the end. Shoud be read randomly.\n");
+ Set<String> actual = Sets.newHashSet();
readEventsForFilesInDir(WORK_DIR, reader, actual);
Set<String> expected = Sets.newHashSet();
createExpectedFromFilesInSetup(expected);
expected.add("");
- expected.add("New file created in the end. Shoud be read randomly.");
+ expected.add(
+ "New file created in the end. Shoud be read randomly.");
Assert.assertEquals(expected, actual);
}
@Test
public void testConsumeFileOldest() throws IOException, InterruptedException {
- ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
- .spoolDirectory(WORK_DIR)
- .consumeOrder(ConsumeOrder.OLDEST)
- .build();
+ ReliableEventReader reader
+ = new ReliableSpoolingFileEventReader.Builder()
+ .spoolDirectory(WORK_DIR)
+ .consumeOrder(ConsumeOrder.OLDEST)
+ .build();
File file1 = new File(WORK_DIR, "new-file1");
File file2 = new File(WORK_DIR, "new-file2");
File file3 = new File(WORK_DIR, "new-file3");
- FileUtils.write(file2, "New file2 created.\n"); // file2 becoming older than file1 & file3
Thread.sleep(1000L);
- FileUtils.write(file1, "New file1 created.\n"); // file1 becoming older than file3
+ FileUtils.write(file2, "New file2 created.\n");
+ Thread.sleep(1000L);
+ FileUtils.write(file1, "New file1 created.\n");
+ Thread.sleep(1000L);
FileUtils.write(file3, "New file3 created.\n");
-
+ // order of age oldest to youngest (file2, file1, file3)
List<String> actual = Lists.newLinkedList();
readEventsForFilesInDir(WORK_DIR, reader, actual);
List<String> expected = Lists.newLinkedList();
@@ -245,25 +251,30 @@ public class TestReliableSpoolingFileEventReader {
}
@Test
- public void testConsumeFileYoungest() throws IOException, InterruptedException {
- ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
- .spoolDirectory(WORK_DIR)
- .consumeOrder(ConsumeOrder.YOUNGEST)
- .build();
- Thread.sleep(1000L);
- File file1 = new File(WORK_DIR, "new-file1");
- File file2 = new File(WORK_DIR, "new-file2");
+ public void testConsumeFileYoungest()
+ throws IOException, InterruptedException {
+ ReliableEventReader reader
+ = new ReliableSpoolingFileEventReader.Builder()
+ .spoolDirectory(WORK_DIR)
+ .consumeOrder(ConsumeOrder.YOUNGEST)
+ .build();
+ File file1 = new File(WORK_DIR, "new-file1");
+ File file2 = new File(WORK_DIR, "new-file2");
File file3 = new File(WORK_DIR, "new-file3");
- FileUtils.write(file2, "New file2 created.\n"); // file2 is oldest among file1 & file3.
- Thread.sleep(1000L);
- FileUtils.write(file3, "New file3 created.\n"); // file3 becomes youngest then file2 but older from file1.
- FileUtils.write(file1, "New file1 created.\n"); // file1 becomes youngest in file2 & file3.
+ Thread.sleep(1000L);
+ FileUtils.write(file2, "New file2 created.\n");
+ Thread.sleep(1000L);
+ FileUtils.write(file3, "New file3 created.\n");
+ Thread.sleep(1000L);
+ FileUtils.write(file1, "New file1 created.\n");
+ // order of age youngest to oldest (file2, file3, file1)
List<String> actual = Lists.newLinkedList();
readEventsForFilesInDir(WORK_DIR, reader, actual);
List<String> expected = Lists.newLinkedList();
createExpectedFromFilesInSetup(expected);
Collections.sort(expected);
- expected.add(0, ""); // Empty Line file was added in the last in Setup.
+ // Empty Line file was added in the last in Setup.
+ expected.add(0, "");
expected.add(0, "New file2 created.");
expected.add(0, "New file3 created.");
expected.add(0, "New file1 created.");
@@ -271,6 +282,66 @@ public class TestReliableSpoolingFileEventReader {
Assert.assertEquals(expected, actual);
}
+ @Test
+ public void testConsumeFileOldestWithLexicographicalComparision()
+ throws IOException, InterruptedException {
+ ReliableEventReader reader
+ = new ReliableSpoolingFileEventReader.Builder()
+ .spoolDirectory(WORK_DIR)
+ .consumeOrder(ConsumeOrder.OLDEST)
+ .build();
+ File file1 = new File(WORK_DIR, "new-file1");
+ File file2 = new File(WORK_DIR, "new-file2");
+ File file3 = new File(WORK_DIR, "new-file3");
+ Thread.sleep(1000L);
+ FileUtils.write(file3, "New file3 created.\n");
+ FileUtils.write(file2, "New file2 created.\n");
+ FileUtils.write(file1, "New file1 created.\n");
+ file1.setLastModified(file3.lastModified());
+ file1.setLastModified(file2.lastModified());
+ // file ages are same now they need to be ordered
+ // lexicographically (file1, file2, file3).
+ List<String> actual = Lists.newLinkedList();
+ readEventsForFilesInDir(WORK_DIR, reader, actual);
+ List<String> expected = Lists.newLinkedList();
+ createExpectedFromFilesInSetup(expected);
+ expected.add(""); // Empty file was added in the last in setup.
+ expected.add("New file1 created.");
+ expected.add("New file2 created.");
+ expected.add("New file3 created.");
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testConsumeFileYoungestWithLexicographicalComparision()
+ throws IOException, InterruptedException {
+ ReliableEventReader reader
+ = new ReliableSpoolingFileEventReader.Builder()
+ .spoolDirectory(WORK_DIR)
+ .consumeOrder(ConsumeOrder.YOUNGEST)
+ .build();
+ File file1 = new File(WORK_DIR, "new-file1");
+ File file2 = new File(WORK_DIR, "new-file2");
+ File file3 = new File(WORK_DIR, "new-file3");
+ Thread.sleep(1000L);
+ FileUtils.write(file1, "New file1 created.\n");
+ FileUtils.write(file2, "New file2 created.\n");
+ FileUtils.write(file3, "New file3 created.\n");
+ file1.setLastModified(file3.lastModified());
+ file1.setLastModified(file2.lastModified());
+ // file ages are same now they need to be ordered
+ // lexicographically (file1, file2, file3).
+ List<String> actual = Lists.newLinkedList();
+ readEventsForFilesInDir(WORK_DIR, reader, actual);
+ List<String> expected = Lists.newLinkedList();
+ createExpectedFromFilesInSetup(expected);
+ expected.add(0, ""); // Empty file was added in the last in setup.
+ expected.add(0, "New file3 created.");
+ expected.add(0, "New file2 created.");
+ expected.add(0, "New file1 created.");
+ Assert.assertEquals(expected, actual);
+ }
+
@Test public void testLargeNumberOfFilesOLDEST() throws IOException {
templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000);
}
@@ -291,9 +362,12 @@ public class TestReliableSpoolingFileEventReader {
int N) throws IOException {
File dir = null;
try {
- dir = new File("target/test/work/" + this.getClass().getSimpleName()+ "_large");
+ dir = new File(
+ "target/test/work/" + this.getClass().getSimpleName() +
+ "_large");
Files.createParentDirs(new File(dir, "dummy"));
- ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+ ReliableEventReader reader
+ = new ReliableSpoolingFileEventReader.Builder()
.spoolDirectory(dir).consumeOrder(order).build();
Map<Long, List<String>> expected;
if (comparator == null) {
@@ -328,8 +402,10 @@ public class TestReliableSpoolingFileEventReader {
if (order == ConsumeOrder.RANDOM) {
Assert.assertTrue(expectedList.remove(new String(e.getBody())));
} else {
- Assert.assertEquals(((ArrayList<String>)expectedList).get(0), new String(e.getBody()));
- ((ArrayList<String>)expectedList).remove(0);
+ Assert.assertEquals(
+ ((ArrayList<String>) expectedList).get(0),
+ new String(e.getBody()));
+ ((ArrayList<String>) expectedList).remove(0);
}
}
reader.commit();