You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/03/28 23:56:31 UTC

git commit: FLUME-2350. Consume Order tests need to space out file creation.

Repository: flume
Updated Branches:
  refs/heads/trunk 61b9bcbb6 -> 62b383a00


FLUME-2350. Consume Order tests need to space out file creation.

(Muhammad Ehsan ul Haque via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/62b383a0
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/62b383a0
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/62b383a0

Branch: refs/heads/trunk
Commit: 62b383a00c3f678b0f504dc71bf36091ddd4067a
Parents: 61b9bcb
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Mar 28 15:55:25 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Mar 28 15:56:25 2014 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   |  25 +++-
 .../TestReliableSpoolingFileEventReader.java    | 132 +++++++++++++++----
 2 files changed, 123 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/62b383a0/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 1818250..0bc3f23 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -429,21 +430,27 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     if (candidateFiles.isEmpty()) { // No matching file in spooling directory.
       return Optional.absent();
     }
-    
+
     File selectedFile = candidateFiles.get(0); // Select the first random file.
     if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random.
       return openFile(selectedFile);
     } else if (consumeOrder == ConsumeOrder.YOUNGEST) {
       for (File candidateFile: candidateFiles) {
-        if (candidateFile.lastModified() >
-          selectedFile.lastModified()) {
+        long compare = selectedFile.lastModified() -
+            candidateFile.lastModified();
+        if (compare == 0) { // ts is same pick smallest lexicographically.
+          selectedFile = smallerLexicographical(selectedFile, candidateFile);
+        } else if (compare < 0) { // candidate is younger (cand-ts > selec-ts)
           selectedFile = candidateFile;
         }
       }
     } else { // default order is OLDEST
       for (File candidateFile: candidateFiles) {
-        if (candidateFile.lastModified() <
-          selectedFile.lastModified()) {
+        long compare = selectedFile.lastModified() -
+            candidateFile.lastModified();
+        if (compare == 0) { // ts is same pick smallest lexicographically.
+          selectedFile = smallerLexicographical(selectedFile, candidateFile);
+        } else if (compare > 0) { // candidate is older (cand-ts < selec-ts).
           selectedFile = candidateFile;
         }
       }
@@ -451,7 +458,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
 
     return openFile(selectedFile);
   }
-  
+
+  private File smallerLexicographical(File f1, File f2) {
+    if (f1.getName().compareTo(f2.getName()) < 0) {
+      return f1;
+    }
+    return f2;
+  }
   /**
    * Opens a file for consuming
    * @param file

http://git-wip-us.apache.org/repos/asf/flume/blob/62b383a0/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 0b07e7a..6a02612 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -203,36 +203,42 @@ public class TestReliableSpoolingFileEventReader {
   
   @Test
   public void testConsumeFileRandomly() throws IOException {
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+    ReliableEventReader reader
+      = new ReliableSpoolingFileEventReader.Builder()
     .spoolDirectory(WORK_DIR)
     .consumeOrder(ConsumeOrder.RANDOM)
     .build();
     File fileName = new File(WORK_DIR, "new-file");
-    FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n");
-    Set<String> actual = Sets.newHashSet();     
+    FileUtils.write(fileName,
+      "New file created in the end. Shoud be read randomly.\n");
+    Set<String> actual = Sets.newHashSet();
     readEventsForFilesInDir(WORK_DIR, reader, actual);      
     Set<String> expected = Sets.newHashSet();
     createExpectedFromFilesInSetup(expected);
     expected.add("");
-    expected.add("New file created in the end. Shoud be read randomly.");
+    expected.add(
+      "New file created in the end. Shoud be read randomly.");
     Assert.assertEquals(expected, actual);    
   }
 
 
   @Test
   public void testConsumeFileOldest() throws IOException, InterruptedException {
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
-    .spoolDirectory(WORK_DIR)
-    .consumeOrder(ConsumeOrder.OLDEST)
-    .build();
+    ReliableEventReader reader
+      = new ReliableSpoolingFileEventReader.Builder()
+      .spoolDirectory(WORK_DIR)
+      .consumeOrder(ConsumeOrder.OLDEST)
+      .build();
     File file1 = new File(WORK_DIR, "new-file1");   
     File file2 = new File(WORK_DIR, "new-file2");    
     File file3 = new File(WORK_DIR, "new-file3");
-    FileUtils.write(file2, "New file2 created.\n"); // file2 becoming older than file1 & file3
     Thread.sleep(1000L);
-    FileUtils.write(file1, "New file1 created.\n"); // file1 becoming older than file3
+    FileUtils.write(file2, "New file2 created.\n");
+    Thread.sleep(1000L);
+    FileUtils.write(file1, "New file1 created.\n");
+    Thread.sleep(1000L);
     FileUtils.write(file3, "New file3 created.\n");
-    
+    // order of age oldest to youngest (file2, file1, file3)
     List<String> actual = Lists.newLinkedList();    
     readEventsForFilesInDir(WORK_DIR, reader, actual);        
     List<String> expected = Lists.newLinkedList();
@@ -245,25 +251,30 @@ public class TestReliableSpoolingFileEventReader {
   }
   
   @Test
-  public void testConsumeFileYoungest() throws IOException, InterruptedException {
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
-    .spoolDirectory(WORK_DIR)
-    .consumeOrder(ConsumeOrder.YOUNGEST)
-    .build();
-    Thread.sleep(1000L);
-    File file1 = new File(WORK_DIR, "new-file1");   
-    File file2 = new File(WORK_DIR, "new-file2");    
+  public void testConsumeFileYoungest()
+    throws IOException, InterruptedException {
+    ReliableEventReader reader
+      = new ReliableSpoolingFileEventReader.Builder()
+      .spoolDirectory(WORK_DIR)
+      .consumeOrder(ConsumeOrder.YOUNGEST)
+      .build();
+    File file1 = new File(WORK_DIR, "new-file1");
+    File file2 = new File(WORK_DIR, "new-file2");
     File file3 = new File(WORK_DIR, "new-file3");
-    FileUtils.write(file2, "New file2 created.\n"); // file2 is oldest among file1 & file3.
-    Thread.sleep(1000L);      
-    FileUtils.write(file3, "New file3 created.\n"); // file3 becomes youngest then file2 but older from file1. 
-    FileUtils.write(file1, "New file1 created.\n"); // file1 becomes youngest in file2 & file3.
+    Thread.sleep(1000L);
+    FileUtils.write(file2, "New file2 created.\n");
+    Thread.sleep(1000L);
+    FileUtils.write(file3, "New file3 created.\n");
+    Thread.sleep(1000L);
+    FileUtils.write(file1, "New file1 created.\n");
+    // order of age youngest to oldest (file2, file3, file1)
     List<String> actual = Lists.newLinkedList();    
     readEventsForFilesInDir(WORK_DIR, reader, actual);        
     List<String> expected = Lists.newLinkedList();
     createExpectedFromFilesInSetup(expected);
     Collections.sort(expected);
-    expected.add(0, ""); // Empty Line file was added in the last in Setup.
+    // Empty Line file was added in the last in Setup.
+    expected.add(0, "");
     expected.add(0, "New file2 created.");    
     expected.add(0, "New file3 created.");
     expected.add(0, "New file1 created.");
@@ -271,6 +282,66 @@ public class TestReliableSpoolingFileEventReader {
     Assert.assertEquals(expected, actual);
   }
 
+  @Test
+  public void testConsumeFileOldestWithLexicographicalComparision()
+    throws IOException, InterruptedException {
+    ReliableEventReader reader
+      = new ReliableSpoolingFileEventReader.Builder()
+      .spoolDirectory(WORK_DIR)
+      .consumeOrder(ConsumeOrder.OLDEST)
+      .build();
+    File file1 = new File(WORK_DIR, "new-file1");
+    File file2 = new File(WORK_DIR, "new-file2");
+    File file3 = new File(WORK_DIR, "new-file3");
+    Thread.sleep(1000L);
+    FileUtils.write(file3, "New file3 created.\n");
+    FileUtils.write(file2, "New file2 created.\n");
+    FileUtils.write(file1, "New file1 created.\n");
+    file1.setLastModified(file3.lastModified());
+    file1.setLastModified(file2.lastModified());
+    // file ages are same now they need to be ordered
+    // lexicographically (file1, file2, file3).
+    List<String> actual = Lists.newLinkedList();
+    readEventsForFilesInDir(WORK_DIR, reader, actual);
+    List<String> expected = Lists.newLinkedList();
+    createExpectedFromFilesInSetup(expected);
+    expected.add(""); // Empty file was added in the last in setup.
+    expected.add("New file1 created.");
+    expected.add("New file2 created.");
+    expected.add("New file3 created.");
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testConsumeFileYoungestWithLexicographicalComparision()
+    throws IOException, InterruptedException {
+    ReliableEventReader reader
+      = new ReliableSpoolingFileEventReader.Builder()
+      .spoolDirectory(WORK_DIR)
+      .consumeOrder(ConsumeOrder.YOUNGEST)
+      .build();
+    File file1 = new File(WORK_DIR, "new-file1");
+    File file2 = new File(WORK_DIR, "new-file2");
+    File file3 = new File(WORK_DIR, "new-file3");
+    Thread.sleep(1000L);
+    FileUtils.write(file1, "New file1 created.\n");
+    FileUtils.write(file2, "New file2 created.\n");
+    FileUtils.write(file3, "New file3 created.\n");
+    file1.setLastModified(file3.lastModified());
+    file1.setLastModified(file2.lastModified());
+    // file ages are same now they need to be ordered
+    // lexicographically (file1, file2, file3).
+    List<String> actual = Lists.newLinkedList();
+    readEventsForFilesInDir(WORK_DIR, reader, actual);
+    List<String> expected = Lists.newLinkedList();
+    createExpectedFromFilesInSetup(expected);
+    expected.add(0, ""); // Empty file was added in the last in setup.
+    expected.add(0, "New file3 created.");
+    expected.add(0, "New file2 created.");
+    expected.add(0, "New file1 created.");
+    Assert.assertEquals(expected, actual);
+  }
+
   @Test public void testLargeNumberOfFilesOLDEST() throws IOException {    
     templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000);
   }
@@ -291,9 +362,12 @@ public class TestReliableSpoolingFileEventReader {
       int N) throws IOException {
     File dir = null;
     try {
-      dir = new File("target/test/work/" + this.getClass().getSimpleName()+ "_large");
+      dir = new File(
+        "target/test/work/" + this.getClass().getSimpleName() +
+          "_large");
       Files.createParentDirs(new File(dir, "dummy"));
-      ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+      ReliableEventReader reader
+        = new ReliableSpoolingFileEventReader.Builder()
       .spoolDirectory(dir).consumeOrder(order).build();
       Map<Long, List<String>> expected;
       if (comparator == null) {
@@ -328,8 +402,10 @@ public class TestReliableSpoolingFileEventReader {
           if (order == ConsumeOrder.RANDOM) {            
             Assert.assertTrue(expectedList.remove(new String(e.getBody())));
           } else {
-            Assert.assertEquals(((ArrayList<String>)expectedList).get(0), new String(e.getBody()));            
-            ((ArrayList<String>)expectedList).remove(0);
+            Assert.assertEquals(
+              ((ArrayList<String>) expectedList).get(0),
+              new String(e.getBody()));
+            ((ArrayList<String>) expectedList).remove(0);
           }
         }
         reader.commit();