You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/09 00:36:05 UTC

flume git commit: FLUME-2939. Update recursive SpoolDir source to use Java 7 APIs

Repository: flume
Updated Branches:
  refs/heads/trunk cfbf11568 -> 7013708ba


FLUME-2939. Update recursive SpoolDir source to use Java 7 APIs

(Bessenyei Bal�zs Don�t via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7013708b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7013708b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7013708b

Branch: refs/heads/trunk
Commit: 7013708baddc8ed7d861797d1fd8280a94b6025c
Parents: cfbf115
Author: Mike Percy <mp...@apache.org>
Authored: Fri Jul 8 17:32:09 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jul 8 17:32:09 2016 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   | 95 ++++++++++----------
 1 file changed, 49 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7013708b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 4dc0207..ca5308c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.io.Files;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.FlumeException;
@@ -43,10 +42,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -160,8 +163,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     try {
       File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
           spoolDirectory);
-      Files.write("testing flume file permissions\n", canary, Charsets.UTF_8);
-      List<String> lines = Files.readLines(canary, Charsets.UTF_8);
+      Files.write(canary.toPath(), "testing flume file permissions\n".getBytes());
+      List<String> lines = Files.readAllLines(canary.toPath(), Charsets.UTF_8);
       Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
       if (!canary.delete()) {
         throw new IOException("Unable to delete canary file " + canary);
@@ -216,49 +219,46 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   }
 
   /**
-   * Filter to exclude files/directories either hidden, finished, or names matching the ignore pattern
-   */
-  final FileFilter filter = new FileFilter() {
-    public boolean accept(File candidate) {
-      if (candidate.isDirectory()) {
-        String directoryName = candidate.getName();
-        if (!recursiveDirectorySearch ||
-            directoryName.startsWith(".") ||
-            ignorePattern.matcher(directoryName).matches()) {
-
-          return false;
-        }
-        return true;
-      }
-      String fileName = candidate.getName();
-      if (fileName.endsWith(completedSuffix) ||
-          fileName.startsWith(".") ||
-          ignorePattern.matcher(fileName).matches()) {
-        return false;
-      }
-
-      return true;
-    }
-  };
-
-  /**
    * Recursively gather candidate files
    * @param directory the directory to gather files from
    * @return list of files within the passed in directory
    */
-  private List<File> getCandidateFiles(File directory) {
+  private List<File> getCandidateFiles(final Path directory) {
     Preconditions.checkNotNull(directory);
-    List<File> candidateFiles = new ArrayList<File>();
-    if (!directory.isDirectory()) {
-      return candidateFiles;
-    }
+    final List<File> candidateFiles = new ArrayList<>();
+    try {
+      Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+        @Override
+        public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+            throws IOException {
+          if (directory.equals(dir)) { // The top directory should always be listed
+            return FileVisitResult.CONTINUE;
+          }
+          String directoryName = dir.getFileName().toString();
+          if (!recursiveDirectorySearch ||
+              directoryName.startsWith(".") ||
+              ignorePattern.matcher(directoryName).matches()) {
+            return FileVisitResult.SKIP_SUBTREE;
+          }
+          return FileVisitResult.CONTINUE;
+        }
 
-    for (File file : directory.listFiles(filter)) {
-      if (file.isDirectory()) {
-        candidateFiles.addAll(getCandidateFiles(file));
-      } else {
-        candidateFiles.add(file);
-      }
+        @Override
+        public FileVisitResult visitFile(Path candidate, BasicFileAttributes attrs)
+            throws IOException {
+          String fileName = candidate.getFileName().toString();
+          if (!fileName.endsWith(completedSuffix) &&
+              !fileName.startsWith(".") &&
+              !ignorePattern.matcher(fileName).matches()) {
+            candidateFiles.add(candidate.toFile());
+          }
+
+          return FileVisitResult.CONTINUE;
+        }
+      });
+    } catch (IOException e) {
+      logger.error("I/O exception occurred while listing directories. " +
+                   "Files already matched will be returned. " + directory, e);
     }
 
     return candidateFiles;
@@ -315,7 +315,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
      * If so, try to roll to the next file, if there is one.
      * Loop until events is not empty or there is no next file in case of 0 byte files */
     while (events.isEmpty()) {
-      logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
+      logger.info("Last read took us just up to a file boundary. " +
+                  "Rolling to the next file, if there is one.");
       retireCurrentFile();
       currentFile = getNextFile();
       if (!currentFile.isPresent()) {
@@ -417,7 +418,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
        * file was already rolled but the rename was not atomic. If that seems
        * likely, we let it pass with only a warning.
        */
-      if (Files.equal(currentFile.get().getFile(), dest)) {
+      if (Files.isSameFile(currentFile.get().getFile().toPath(), dest.toPath())) {
         logger.warn("Completed file " + dest +
             " already exists, but files match, so continuing.");
         boolean deleted = fileToRoll.delete();
@@ -494,7 +495,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     if (consumeOrder != ConsumeOrder.RANDOM ||
         candidateFileIter == null ||
         !candidateFileIter.hasNext()) {
-      candidateFiles = getCandidateFiles(spoolDirectory);
+      candidateFiles = getCandidateFiles(spoolDirectory.toPath());
       listFilesCount++;
       candidateFileIter = candidateFiles.iterator();
     }
@@ -540,7 +541,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   /**
    * Opens a file for consuming
    * @param file
-   * @return {@link #FileInfo} for the file to consume or absent option if the
+   * @return {@link FileInfo} for the file to consume or absent option if the
    * file does not exists or readable.
    */
   private Optional<FileInfo> openFile(File file) {    
@@ -584,7 +585,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     }
   }
 
-  /** An immutable class with information about a file being processed. */
+  /**
+   * An immutable class with information about a file being processed.
+   */
   private static class FileInfo {
     private final File file;
     private final long length;