You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2016/07/09 00:36:05 UTC
flume git commit: FLUME-2939. Update recursive SpoolDir source to use
Java 7 APIs
Repository: flume
Updated Branches:
refs/heads/trunk cfbf11568 -> 7013708ba
FLUME-2939. Update recursive SpoolDir source to use Java 7 APIs
(Bessenyei Bal�zs Don�t via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7013708b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7013708b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7013708b
Branch: refs/heads/trunk
Commit: 7013708baddc8ed7d861797d1fd8280a94b6025c
Parents: cfbf115
Author: Mike Percy <mp...@apache.org>
Authored: Fri Jul 8 17:32:09 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jul 8 17:32:09 2016 -0700
----------------------------------------------------------------------
.../avro/ReliableSpoolingFileEventReader.java | 95 ++++++++++----------
1 file changed, 49 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/7013708b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 4dc0207..ca5308c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.io.Files;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
@@ -43,10 +42,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -160,8 +163,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
try {
File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary",
spoolDirectory);
- Files.write("testing flume file permissions\n", canary, Charsets.UTF_8);
- List<String> lines = Files.readLines(canary, Charsets.UTF_8);
+ Files.write(canary.toPath(), "testing flume file permissions\n".getBytes());
+ List<String> lines = Files.readAllLines(canary.toPath(), Charsets.UTF_8);
Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
if (!canary.delete()) {
throw new IOException("Unable to delete canary file " + canary);
@@ -216,49 +219,46 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
}
/**
- * Filter to exclude files/directories either hidden, finished, or names matching the ignore pattern
- */
- final FileFilter filter = new FileFilter() {
- public boolean accept(File candidate) {
- if (candidate.isDirectory()) {
- String directoryName = candidate.getName();
- if (!recursiveDirectorySearch ||
- directoryName.startsWith(".") ||
- ignorePattern.matcher(directoryName).matches()) {
-
- return false;
- }
- return true;
- }
- String fileName = candidate.getName();
- if (fileName.endsWith(completedSuffix) ||
- fileName.startsWith(".") ||
- ignorePattern.matcher(fileName).matches()) {
- return false;
- }
-
- return true;
- }
- };
-
- /**
* Recursively gather candidate files
* @param directory the directory to gather files from
* @return list of files within the passed in directory
*/
- private List<File> getCandidateFiles(File directory) {
+ private List<File> getCandidateFiles(final Path directory) {
Preconditions.checkNotNull(directory);
- List<File> candidateFiles = new ArrayList<File>();
- if (!directory.isDirectory()) {
- return candidateFiles;
- }
+ final List<File> candidateFiles = new ArrayList<>();
+ try {
+ Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+ throws IOException {
+ if (directory.equals(dir)) { // The top directory should always be listed
+ return FileVisitResult.CONTINUE;
+ }
+ String directoryName = dir.getFileName().toString();
+ if (!recursiveDirectorySearch ||
+ directoryName.startsWith(".") ||
+ ignorePattern.matcher(directoryName).matches()) {
+ return FileVisitResult.SKIP_SUBTREE;
+ }
+ return FileVisitResult.CONTINUE;
+ }
- for (File file : directory.listFiles(filter)) {
- if (file.isDirectory()) {
- candidateFiles.addAll(getCandidateFiles(file));
- } else {
- candidateFiles.add(file);
- }
+ @Override
+ public FileVisitResult visitFile(Path candidate, BasicFileAttributes attrs)
+ throws IOException {
+ String fileName = candidate.getFileName().toString();
+ if (!fileName.endsWith(completedSuffix) &&
+ !fileName.startsWith(".") &&
+ !ignorePattern.matcher(fileName).matches()) {
+ candidateFiles.add(candidate.toFile());
+ }
+
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ logger.error("I/O exception occurred while listing directories. " +
+ "Files already matched will be returned. " + directory, e);
}
return candidateFiles;
@@ -315,7 +315,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
* If so, try to roll to the next file, if there is one.
* Loop until events is not empty or there is no next file in case of 0 byte files */
while (events.isEmpty()) {
- logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
+ logger.info("Last read took us just up to a file boundary. " +
+ "Rolling to the next file, if there is one.");
retireCurrentFile();
currentFile = getNextFile();
if (!currentFile.isPresent()) {
@@ -417,7 +418,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
* file was already rolled but the rename was not atomic. If that seems
* likely, we let it pass with only a warning.
*/
- if (Files.equal(currentFile.get().getFile(), dest)) {
+ if (Files.isSameFile(currentFile.get().getFile().toPath(), dest.toPath())) {
logger.warn("Completed file " + dest +
" already exists, but files match, so continuing.");
boolean deleted = fileToRoll.delete();
@@ -494,7 +495,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
if (consumeOrder != ConsumeOrder.RANDOM ||
candidateFileIter == null ||
!candidateFileIter.hasNext()) {
- candidateFiles = getCandidateFiles(spoolDirectory);
+ candidateFiles = getCandidateFiles(spoolDirectory.toPath());
listFilesCount++;
candidateFileIter = candidateFiles.iterator();
}
@@ -540,7 +541,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
/**
* Opens a file for consuming
* @param file
- * @return {@link #FileInfo} for the file to consume or absent option if the
+ * @return {@link FileInfo} for the file to consume or absent option if the
* file does not exists or readable.
*/
private Optional<FileInfo> openFile(File file) {
@@ -584,7 +585,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
}
}
- /** An immutable class with information about a file being processed. */
+ /**
+ * An immutable class with information about a file being processed.
+ */
private static class FileInfo {
private final File file;
private final long length;