You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/08/17 13:13:47 UTC

flume git commit: FLUME-3239 Do not rename files in SpoolDirectorySource

Repository: flume
Updated Branches:
  refs/heads/trunk 20338fc67 -> 368776ff7


FLUME-3239 Do not rename files in SpoolDirectorySource

Added functionality to track files in the meta directory
rather than renaming them.
Improved tests for checking multilevel directories.

This closes #214

Reviewers: Ferenc Szabo, Peter Turcsanyi

(Endre Major via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/368776ff
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/368776ff
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/368776ff

Branch: refs/heads/trunk
Commit: 368776ff77be7939b5e84a431df052cece610a07
Parents: 20338fc
Author: Endre Major <em...@cloudera.com>
Authored: Fri Aug 17 15:12:47 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Fri Aug 17 15:12:47 2018 +0200

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   | 102 ++++++++++-
 .../flume/source/SpoolDirectorySource.java      |   3 +
 ...olDirectorySourceConfigurationConstants.java |   3 +
 .../TestReliableSpoolingFileEventReader.java    | 174 +++++++++++--------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   8 +-
 5 files changed, 209 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/368776ff/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 1e1d955..830d21b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -49,13 +49,16 @@ import java.nio.charset.Charset;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 /**
@@ -89,17 +92,20 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
 
   static final String metaFileName = ".flumespool-main.meta";
   private final File spoolDirectory;
+  private final Path spoolDirPath;
   private final String completedSuffix;
   private final String deserializerType;
   private final Context deserializerContext;
   private final Pattern includePattern;
   private final Pattern ignorePattern;
   private final File metaFile;
+  private File trackerDirectory;
   private final boolean annotateFileName;
   private final boolean annotateBaseName;
   private final String fileNameHeader;
   private final String baseNameHeader;
   private final String deletePolicy;
+  private final TrackingPolicy trackingPolicy;
   private final Charset inputCharset;
   private final DecodeErrorPolicy decodeErrorPolicy;
   private final ConsumeOrder consumeOrder;
@@ -115,6 +121,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   private Iterator<File> candidateFileIter = null;
   private int listFilesCount = 0;
 
+  private String trackerDirectoryAbsolutePath;
+
   /**
    * Create a ReliableSpoolingFileEventReader to watch the given directory.
    */
@@ -123,7 +131,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       boolean annotateFileName, String fileNameHeader,
       boolean annotateBaseName, String baseNameHeader,
       String deserializerType, Context deserializerContext,
-      String deletePolicy, String inputCharset,
+      String deletePolicy, String trackingPolicy, String inputCharset,
       DecodeErrorPolicy decodeErrorPolicy,
       ConsumeOrder consumeOrder,
       boolean recursiveDirectorySearch) throws IOException {
@@ -137,6 +145,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     Preconditions.checkNotNull(deserializerType);
     Preconditions.checkNotNull(deserializerContext);
     Preconditions.checkNotNull(deletePolicy);
+    Preconditions.checkNotNull(trackingPolicy);
     Preconditions.checkNotNull(inputCharset);
 
     // validate delete policy
@@ -146,6 +155,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
           "NEVER and IMMEDIATE are not yet supported");
     }
 
+    // validate tracking policy
+    if (!trackingPolicy.equalsIgnoreCase(TrackingPolicy.RENAME.name()) &&
+            !trackingPolicy.equalsIgnoreCase(TrackingPolicy.TRACKER_DIR.name())) {
+      throw new IllegalArgumentException("Tracking policies other than " +
+              "RENAME and TRACKER_DIR are not supported");
+    }
+
     if (logger.isDebugEnabled()) {
       logger.debug("Initializing {} with directory={}, metaDir={}, " +
                    "deserializer={}",
@@ -188,12 +204,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     this.includePattern = Pattern.compile(includePattern);
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
+    this.trackingPolicy = TrackingPolicy.valueOf(trackingPolicy.toUpperCase());
     this.inputCharset = Charset.forName(inputCharset);
     this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
     this.consumeOrder = Preconditions.checkNotNull(consumeOrder);
     this.recursiveDirectorySearch = recursiveDirectorySearch;
 
-    File trackerDirectory = new File(trackerDirPath);
+    trackerDirectory = new File(trackerDirPath);
 
     // if relative path, treat as relative to spool directory
     if (!trackerDirectory.isAbsolute()) {
@@ -219,6 +236,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     if (metaFile.exists() && metaFile.length() == 0) {
       deleteMetaFile();
     }
+
+    spoolDirPath = Paths.get(spoolDirectory.getAbsolutePath());
+    trackerDirectoryAbsolutePath = trackerDirectory.getAbsolutePath();
+
   }
 
   /**
@@ -231,6 +252,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     Preconditions.checkNotNull(directory);
     final List<File> candidateFiles = new ArrayList<>();
     try {
+      final Set<Path> trackerDirCompletedFiles = getTrackerDirCompletedFiles();
       Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
         @Override
         public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
@@ -252,6 +274,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
             throws IOException {
           String fileName = candidate.getFileName().toString();
           if (!fileName.endsWith(completedSuffix) &&
+              !isFileInTrackerDir(trackerDirCompletedFiles, candidate) &&
               !fileName.startsWith(".") &&
               includePattern.matcher(fileName).matches() &&
               !ignorePattern.matcher(fileName).matches()) {
@@ -269,6 +292,38 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     return candidateFiles;
   }
 
+  private Set<Path> getTrackerDirCompletedFiles() throws IOException {
+    final Set<Path> completedFiles = new HashSet<>();
+    if (TrackingPolicy.TRACKER_DIR != trackingPolicy) {
+      return completedFiles;
+    }
+
+    Path trackerDirPath = trackerDirectory.toPath();
+    Files.walkFileTree(trackerDirPath, new SimpleFileVisitor<Path>() {
+
+      @Override
+      public FileVisitResult visitFile(Path candidate, BasicFileAttributes attrs)
+              throws IOException {
+        String fileName = candidate.getFileName().toString();
+        if (fileName.endsWith(completedSuffix)) {
+          completedFiles.add(candidate.toAbsolutePath());
+        }
+        return FileVisitResult.CONTINUE;
+      }
+    });
+    return completedFiles;
+  }
+
+  private boolean isFileInTrackerDir(Set<Path> completedFiles, Path path) {
+    Path relPath = getRelPathToSpoolDir(path);
+    Path trackerPath = Paths.get(trackerDirectoryAbsolutePath, relPath.toString() + completedSuffix);
+    return completedFiles.contains(trackerPath);
+  }
+
+  private Path getRelPathToSpoolDir(Path path) {
+    return spoolDirPath.relativize(path.toAbsolutePath());
+  }
+
   @VisibleForTesting
   int getListFilesCount() {
     return listFilesCount;
@@ -411,7 +466,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     }
 
     if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) {
-      rollCurrentFile(fileToRoll);
+      if (trackingPolicy == TrackingPolicy.RENAME) {
+        rollCurrentFile(fileToRoll);
+      } else {
+        rollCurrentFileInTrackerDir(fileToRoll);
+      }
     } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
       deleteCurrentFile(fileToRoll);
     } else {
@@ -482,6 +541,24 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     }
   }
 
+  private void rollCurrentFileInTrackerDir(File fileToRoll) throws IOException {
+    Path path = fileToRoll.toPath();
+    Path relToRoll = getRelPathToSpoolDir(path);
+
+    File dest = new File(trackerDirectory.getPath(), relToRoll + completedSuffix);
+    logger.info("Preparing to create tracker file for {} at {}", fileToRoll, dest);
+    if (dest.exists()) {
+      String message = "File name has been re-used with different" +
+              " files. Spooling assumptions violated for " + dest;
+      throw new IllegalStateException(message);
+    }
+    //Create an empty file as an indicator
+    dest.getParentFile().mkdirs(); //create the parent dirs first
+    if (!dest.createNewFile()) {
+      throw new IOException("Could not create tracker file: " + dest);
+    }
+  }
+
   /**
    * Delete the given spooled file
    *
@@ -649,12 +726,20 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
 
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
-  static enum DeletePolicy {
+  enum DeletePolicy {
     NEVER,
     IMMEDIATE,
     DELAY
   }
 
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public enum TrackingPolicy {
+    RENAME,
+    TRACKER_DIR
+  }
+
+
   /**
    * Special builder class for ReliableSpoolingFileEventReader
    */
@@ -681,6 +766,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     private Context deserializerContext = new Context();
     private String deletePolicy =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
+    private String trackingPolicy =
+            SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKING_POLICY;
     private String inputCharset =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
     private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy.valueOf(
@@ -751,6 +838,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return this;
     }
 
+    public Builder trackingPolicy(String trackingPolicy) {
+      this.trackingPolicy = trackingPolicy;
+      return this;
+    }
+
     public Builder inputCharset(String inputCharset) {
       this.inputCharset = inputCharset;
       return this;
@@ -775,7 +867,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           includePattern, ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
           annotateBaseName, baseNameHeader, deserializerType,
-          deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy,
+          deserializerContext, deletePolicy, trackingPolicy, inputCharset, decodeErrorPolicy,
           consumeOrder, recursiveDirectorySearch);
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/368776ff/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 107a381..68bbe7f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -77,6 +77,7 @@ public class SpoolDirectorySource extends AbstractSource
   private ConsumeOrder consumeOrder;
   private int pollDelay;
   private boolean recursiveDirectorySearch;
+  private String trackingPolicy;
 
   @Override
   public synchronized void start() {
@@ -104,6 +105,7 @@ public class SpoolDirectorySource extends AbstractSource
           .decodeErrorPolicy(decodeErrorPolicy)
           .consumeOrder(consumeOrder)
           .recursiveDirectorySearch(recursiveDirectorySearch)
+          .trackingPolicy(trackingPolicy)
           .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
@@ -193,6 +195,7 @@ public class SpoolDirectorySource extends AbstractSource
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
     }
+    trackingPolicy = context.getString(TRACKING_POLICY, DEFAULT_TRACKING_POLICY);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flume/blob/368776ff/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index a065dc0..3f16e0f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -78,6 +78,9 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String DELETE_POLICY = "deletePolicy";
   public static final String DEFAULT_DELETE_POLICY = "never";
 
+  public static final String TRACKING_POLICY = "trackingPolicy";
+  public static final String DEFAULT_TRACKING_POLICY = "rename";
+
   /** Character set used when reading the input. */
   public static final String INPUT_CHARSET = "inputCharset";
   public static final String DEFAULT_INPUT_CHARSET = "UTF-8";

http://git-wip-us.apache.org/repos/asf/flume/blob/368776ff/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index b257999..422252e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -39,8 +39,10 @@ import java.util.concurrent.Semaphore;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.SystemUtils;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.flume.Event;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
+import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.TrackingPolicy;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
 import org.junit.After;
@@ -94,26 +96,10 @@ public class TestReliableSpoolingFileEventReader {
 
   private void deleteDir(File dir) {
     // delete all the files & dirs we created
-    File[] files = dir.listFiles();
-    for (File f : files) {
-      if (f.isDirectory()) {
-        File[] subDirFiles = f.listFiles();
-        for (File sdf : subDirFiles) {
-          if (!sdf.delete()) {
-            logger.warn("Cannot delete file {}", sdf.getAbsolutePath());
-          }
-        }
-        if (!f.delete()) {
-          logger.warn("Cannot delete directory {}", f.getAbsolutePath());
-        }
-      } else {
-        if (!f.delete()) {
-          logger.warn("Cannot delete file {}", f.getAbsolutePath());
-        }
-      }
-    }
-    if (!dir.delete()) {
-      logger.warn("Cannot delete work directory {}", dir.getAbsolutePath());
+    try {
+      FileUtils.deleteDirectory(dir);
+    } catch (IOException e) {
+      logger.warn("Cannot delete work directory {}", dir.getAbsolutePath(), e);
     }
   }
 
@@ -127,7 +113,7 @@ public class TestReliableSpoolingFileEventReader {
 
   /**
    * Verify if the give dir contains only the given files
-   * 
+   *
    * @param dir
    *          the directory to check
    * @param files
@@ -139,12 +125,12 @@ public class TestReliableSpoolingFileEventReader {
 
     List<File> actualFiles = listFiles(dir);
     Set<String> expectedFiles = new HashSet<String>(Arrays.asList(files));
-    
+
     // Verify if the number of files in the dir is the expected
     if (actualFiles.size() != expectedFiles.size()) {
       return false;
     }
-    
+
     // Then check files by name
     for (File f : actualFiles) {
       expectedFiles.remove(f.getName());
@@ -160,7 +146,7 @@ public class TestReliableSpoolingFileEventReader {
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
         .build();
-    
+
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
     Assert.assertTrue("Expected " + beforeFiles.length + " files in working dir",
         checkLeftFilesInDir(WORK_DIR, beforeFiles));
@@ -203,13 +189,13 @@ public class TestReliableSpoolingFileEventReader {
     // pattern
     // - emptylineFile: not deleted as not matching ignore pattern but not
     // matching include pattern as well
-    
+
     ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
         .spoolDirectory(WORK_DIR)
         .ignorePattern("^file[013]$")
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
-        .build(); 
+        .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
     Assert.assertTrue("Expected " + beforeFiles.length + " files in working dir",
@@ -230,14 +216,14 @@ public class TestReliableSpoolingFileEventReader {
     // Expected behavior:
     // - file2: not deleted as both include and ignore patterns match (safety
     // measure: ignore always wins on conflict)
-    
+
     ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
         .spoolDirectory(WORK_DIR)
         .ignorePattern("^file2$")
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
         .build();
-    
+
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
     Assert.assertTrue("Expected " + beforeFiles.length + " files in working dir",
         checkLeftFilesInDir(WORK_DIR, beforeFiles));
@@ -327,7 +313,7 @@ public class TestReliableSpoolingFileEventReader {
                                                  .consumeOrder(null)
                                                  .build();
   }
-  
+
   @Test
   public void testConsumeFileRandomly() throws IOException {
     ReliableEventReader reader =
@@ -342,7 +328,7 @@ public class TestReliableSpoolingFileEventReader {
     createExpectedFromFilesInSetup(expected);
     expected.add("");
     expected.add("New file created in the end. Shoud be read randomly.");
-    Assert.assertEquals(expected, actual);    
+    Assert.assertEquals(expected, actual);
   }
 
   @Test
@@ -391,8 +377,8 @@ public class TestReliableSpoolingFileEventReader {
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.OLDEST)
                                                      .build();
-    File file1 = new File(WORK_DIR, "new-file1");   
-    File file2 = new File(WORK_DIR, "new-file2");    
+    File file1 = new File(WORK_DIR, "new-file1");
+    File file2 = new File(WORK_DIR, "new-file2");
     File file3 = new File(WORK_DIR, "new-file3");
     Thread.sleep(1000L);
     FileUtils.write(file2, "New file2 created.\n");
@@ -401,17 +387,17 @@ public class TestReliableSpoolingFileEventReader {
     Thread.sleep(1000L);
     FileUtils.write(file3, "New file3 created.\n");
     // order of age oldest to youngest (file2, file1, file3)
-    List<String> actual = Lists.newLinkedList();    
-    readEventsForFilesInDir(WORK_DIR, reader, actual);        
+    List<String> actual = Lists.newLinkedList();
+    readEventsForFilesInDir(WORK_DIR, reader, actual);
     List<String> expected = Lists.newLinkedList();
     createExpectedFromFilesInSetup(expected);
     expected.add(""); // Empty file was added in the last in setup.
     expected.add("New file2 created.");
     expected.add("New file1 created.");
-    expected.add("New file3 created.");    
+    expected.add("New file3 created.");
     Assert.assertEquals(expected, actual);
   }
-  
+
   @Test
   public void testConsumeFileYoungest() throws IOException, InterruptedException {
     ReliableEventReader reader =
@@ -428,17 +414,17 @@ public class TestReliableSpoolingFileEventReader {
     Thread.sleep(1000L);
     FileUtils.write(file1, "New file1 created.\n");
     // order of age youngest to oldest (file2, file3, file1)
-    List<String> actual = Lists.newLinkedList();    
-    readEventsForFilesInDir(WORK_DIR, reader, actual);        
+    List<String> actual = Lists.newLinkedList();
+    readEventsForFilesInDir(WORK_DIR, reader, actual);
     List<String> expected = Lists.newLinkedList();
     createExpectedFromFilesInSetup(expected);
     Collections.sort(expected);
     // Empty Line file was added in the last in Setup.
     expected.add(0, "");
-    expected.add(0, "New file2 created.");    
+    expected.add(0, "New file2 created.");
     expected.add(0, "New file3 created.");
     expected.add(0, "New file1 created.");
-        
+
     Assert.assertEquals(expected, actual);
   }
 
@@ -500,22 +486,30 @@ public class TestReliableSpoolingFileEventReader {
     Assert.assertEquals(expected, actual);
   }
 
-  @Test public void testLargeNumberOfFilesOLDEST() throws IOException {    
-    templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000);
+  @Test public void testLargeNumberOfFilesOLDEST() throws IOException {
+    templateTestForRecursiveDirs(ConsumeOrder.OLDEST, null, 3, 3, 37, TrackingPolicy.RENAME);
   }
 
-  @Test public void testLargeNumberOfFilesYOUNGEST() throws IOException {    
-    templateTestForLargeNumberOfFiles(ConsumeOrder.YOUNGEST, new Comparator<Long>() {
+  @Test public void testLargeNumberOfFilesYOUNGEST() throws IOException {
+    templateTestForRecursiveDirs(ConsumeOrder.YOUNGEST, Comparator.reverseOrder(),
+        3, 3, 37, TrackingPolicy.RENAME);
+  }
 
-      @Override
-      public int compare(Long o1, Long o2) {
-        return o2.compareTo(o1);
-      }
-    }, 1000);
+  @Test public void testLargeNumberOfFilesRANDOM() throws IOException {
+    templateTestForRecursiveDirs(ConsumeOrder.RANDOM, null, 3, 3, 37, TrackingPolicy.RENAME);
   }
 
-  @Test public void testLargeNumberOfFilesRANDOM() throws IOException {    
-    templateTestForLargeNumberOfFiles(ConsumeOrder.RANDOM, null, 1000);
+  @Test public void testLargeNumberOfFilesOLDESTTrackerDir() throws IOException {
+    templateTestForRecursiveDirs(ConsumeOrder.OLDEST, null, 3, 3, 10, TrackingPolicy.TRACKER_DIR);
+  }
+
+  @Test public void testLargeNumberOfFilesYOUNGESTTrackerDir() throws IOException {
+    templateTestForRecursiveDirs(ConsumeOrder.YOUNGEST, Comparator.reverseOrder(),
+        3, 3, 10, TrackingPolicy.TRACKER_DIR);
+  }
+
+  @Test public void testLargeNumberOfFilesRANDOMTrackerDir() throws IOException {
+    templateTestForRecursiveDirs(ConsumeOrder.RANDOM, null, 3, 3, 10, TrackingPolicy.TRACKER_DIR);
   }
 
   @Test
@@ -548,65 +542,95 @@ public class TestReliableSpoolingFileEventReader {
     Assert.assertEquals(expectedLines, seenLines);
   }
 
-  private void templateTestForLargeNumberOfFiles(ConsumeOrder order, Comparator<Long> comparator,
-                                                 int N) throws IOException {
+  private void templateTestForRecursiveDirs(ConsumeOrder order, Comparator<Long> comparator, int depth, int dirNum,
+                                            int fileNum, TrackingPolicy trackingPolicy) throws IOException {
     File dir = null;
     try {
       dir = new File("target/test/work/" + this.getClass().getSimpleName() + "_large");
       Files.createParentDirs(new File(dir, "dummy"));
       ReliableEventReader reader =
-          new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir)
-                                                       .consumeOrder(order)
-                                                       .build();
+              new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir)
+                      .consumeOrder(order)
+                      .trackingPolicy(trackingPolicy.toString())
+                      .recursiveDirectorySearch(true)
+                      .build();
       Map<Long, List<String>> expected;
       if (comparator == null) {
         expected = new TreeMap<Long, List<String>>();
       } else {
         expected = new TreeMap<Long, List<String>>(comparator);
       }
-      for (int i = 0; i < N; i++) {
-        File f = new File(dir, "file-" + i);
-        String data = "file-" + i;
-        Files.write(data, f, Charsets.UTF_8);
-        if (expected.containsKey(f.lastModified())) {
-          expected.get(f.lastModified()).add(data);
-        } else {
-          expected.put(f.lastModified(), Lists.newArrayList(data));
-        }
-      }
-      Collection<String> expectedList;
+      createMultilevelFiles(dir, 0, depth, dirNum, fileNum, expected, new MutableLong(0L));
+      Collection<String> expectedColl;
+      int index = 0;
       if (order == ConsumeOrder.RANDOM) {
-        expectedList = Sets.newHashSet();
+        expectedColl = Sets.newHashSet();
       } else {
-        expectedList = Lists.newArrayList();
+        expectedColl = new ArrayList<>();
       }
       for (Entry<Long, List<String>> entry : expected.entrySet()) {
         Collections.sort(entry.getValue());
-        expectedList.addAll(entry.getValue());
+        expectedColl.addAll(entry.getValue());
       }
-      for (int i = 0; i < N; i++) {
+
+      int expNum = expectedColl.size();
+      int actualNum = 0;
+      for (int i = 0; i < expNum; i++) {
         List<Event> events;
         events = reader.readEvents(10);
         for (Event e : events) {
+          actualNum++;
           if (order == ConsumeOrder.RANDOM) {
-            Assert.assertTrue(expectedList.remove(new String(e.getBody())));
+            Assert.assertTrue(expectedColl.remove(new String(e.getBody())));
           } else {
-            Assert.assertEquals(((ArrayList<String>) expectedList).get(0), new String(e.getBody()));
-            ((ArrayList<String>) expectedList).remove(0);
+            String exp = ((ArrayList<String>) expectedColl).get(index);
+            String actual = new String(e.getBody());
+            Assert.assertEquals(exp, actual);
+            index++;
           }
         }
         reader.commit();
       }
+      Assert.assertEquals(expNum, actualNum);
     } finally {
       deleteDir(dir);
     }
   }
 
+  private void createMultilevelFiles(File dir, int currDepth, int maxDepth, int dirNum, int fileNum,
+                                     Map<Long, List<String>> expected, MutableLong id) throws IOException {
+    if (currDepth == maxDepth) {
+      createFiles(dir, fileNum, expected, id);
+    } else {
+      for (int i = 0; i < dirNum; i++) {
+        File nextDir = new File(dir, "dir-" + i);
+        nextDir.mkdirs();
+        createMultilevelFiles(nextDir, currDepth + 1, maxDepth, dirNum, fileNum, expected, id);
+      }
+    }
+  }
+
+  private void createFiles(File dir, int fileNum, Map<Long, List<String>> expected, MutableLong id) throws IOException {
+    for (int i = 0; i < fileNum; i++) {
+      File f = new File(dir, "file-" + id);
+      String data = f.getPath();
+      Files.write(data, f, Charsets.UTF_8);
+      long lastMod = id.longValue() * 10000L;
+      f.setLastModified(lastMod);
+      if (expected.containsKey(f.lastModified())) {
+        expected.get(f.lastModified()).add(data);
+      } else {
+        expected.put(f.lastModified(), Lists.newArrayList(data));
+      }
+      id.increment();
+    }
+  }
+
   private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
                                        Collection<String> actual) throws IOException {
     readEventsForFilesInDir(dir, reader, actual, null, null);
   }
-    
+
   /* Read events, one for each file in the given directory. */
   private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
                                        Collection<String> actual, Semaphore semaphore1,

http://git-wip-us.apache.org/repos/asf/flume/blob/368776ff/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 3c93f52..3ec2b68 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1005,7 +1005,8 @@ This source will watch the specified directory for new files, and will parse
 events out of new files as they appear.
 The event parsing logic is pluggable.
 After a given file has been fully read
-into the channel, it is renamed to indicate completion (or optionally deleted).
+into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used
+to keep track of processed files.
 
 Unlike the Exec source, this source is reliable and will not miss data, even if
 Flume is restarted or killed. In exchange for this reliability, only immutable,
@@ -1047,6 +1048,11 @@ ignorePattern             ^$              Regular expression specifying which fi
                                           the file is ignored.
 trackerDir                .flumespool     Directory to store metadata related to processing of files.
                                           If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
+trackingPolicy            rename          The tracking policy defines how file processing is tracked. It can be "rename" or
+                                          "tracker_dir". This parameter is only effective if the deletePolicy is "never".
+                                          "rename" - After processing files they get renamed according to the fileSuffix parameter.
+                                          "tracker_dir" - Files are not renamed but a new empty file is created in the trackerDir.
+                                          The new tracker file name is derived from the ingested one plus the fileSuffix.
 consumeOrder              oldest          In which order files in the spooling directory will be consumed ``oldest``,
                                           ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified
                                           time of the files will be used to compare the files. In case of a tie, the file