You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/05/15 07:07:56 UTC
apex-malhar git commit: APEXMALHAR-2447 Added notfication in
AbstractFileInputOperator,
when scanning the directory for first time. Implementations can use these
information in different ways such as notifying downstream operators that a
new directory ha
Repository: apex-malhar
Updated Branches:
refs/heads/master c128091c7 -> 3a3629841
APEXMALHAR-2447 Added notfication in AbstractFileInputOperator, when scanning the directory for first time. Implementations can use these information in different ways such as notifying downstream operators that a new directory has been scanned.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/3a362984
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/3a362984
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/3a362984
Branch: refs/heads/master
Commit: 3a362984143dde0a89fb9538a704ed1f444f0204
Parents: c128091
Author: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Authored: Thu Mar 16 08:27:39 2017 -0700
Committer: Lakshmi Prasanna Velineni <la...@datatorrent.com>
Committed: Sun May 14 21:41:06 2017 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileInputOperator.java | 40 +++++++++--
.../io/fs/AbstractFileInputOperatorTest.java | 75 ++++++++++++++++++--
2 files changed, 105 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3a362984/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index bc5ebf1..5c34546 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -710,6 +711,23 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
}
/**
+ * Notifies that the directory is being scanned.<br>
+ * Override this method to custom handling. Will be called once
+ */
+ protected void visitDirectory(Path filePath)
+ {
+ }
+
+ private void checkVisitedDirectory(Path path)
+ {
+ String pathString = path.toString();
+ if (!processedFiles.contains(pathString)) {
+ visitDirectory(path);
+ processedFiles.add(pathString);
+ }
+ }
+
+ /**
* Scans the directory for new files.
*/
protected void scanDirectory()
@@ -718,10 +736,19 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
Set<Path> newPaths = scanner.scan(fs, filePath, processedFiles);
for (Path newPath : newPaths) {
- String newPathString = newPath.toString();
- pendingFiles.add(newPathString);
- processedFiles.add(newPathString);
- localProcessedFileCount.increment();
+ try {
+ FileStatus fileStatus = fs.getFileStatus(newPath);
+ if (fileStatus.isDirectory()) {
+ checkVisitedDirectory(newPath);
+ } else {
+ String newPathString = newPath.toString();
+ pendingFiles.add(newPathString);
+ processedFiles.add(newPathString);
+ localProcessedFileCount.increment();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
lastScanMillis = System.currentTimeMillis();
@@ -1059,6 +1086,11 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
LinkedHashSet<Path> pathSet = Sets.newLinkedHashSet();
try {
LOG.debug("Scanning {} with pattern {}", filePath, this.filePatternRegexp);
+ if (!consumedFiles.contains(filePath.toString())) {
+ if (fs.isDirectory(filePath)) {
+ pathSet.add(filePath);
+ }
+ }
FileStatus[] files = fs.listStatus(filePath);
for (FileStatus status : files) {
Path path = status.getPath();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/3a362984/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index b9cdd67..8acd16a 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -20,7 +20,6 @@ package com.datatorrent.lib.io.fs;
import java.io.ByteArrayOutputStream;
import java.io.File;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -59,7 +58,6 @@ import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner.Partition;
import com.datatorrent.api.StatsListener;
-
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner;
import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl;
import com.datatorrent.lib.testbench.CollectorTestSink;
@@ -152,6 +150,71 @@ public class AbstractFileInputOperatorTest
}
+ public static class LineOperator extends LineByLineFileInputOperator
+ {
+ Set<String> dirPaths = Sets.newHashSet();
+
+ @Override
+ protected void visitDirectory(Path filePath)
+ {
+ dirPaths.add(Path.getPathWithoutSchemeAndAuthority(filePath).toString());
+ }
+ }
+
+ @Test
+ public void testEmptyDirectory() throws Exception
+ {
+ FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
+ Set<String> dPaths = Sets.newHashSet();
+ dPaths.add(new File(testMeta.dir).getCanonicalPath());
+
+ String subdir01 = "/a";
+ dPaths.add(new File(testMeta.dir + subdir01).getCanonicalPath());
+ FileUtils.forceMkdir((new File(testMeta.dir + subdir01)));
+
+ String subdir02 = "/b";
+ dPaths.add(new File(testMeta.dir + subdir02).getCanonicalPath());
+ FileUtils.forceMkdir(new File(testMeta.dir + subdir02));
+
+ String subdir03 = subdir02 + "/c";
+ dPaths.add(new File(testMeta.dir + subdir03).getCanonicalPath());
+ FileUtils.forceMkdir(new File(testMeta.dir + subdir03));
+
+ String subdir04 = "/d";
+ List<String> allLines = Lists.newArrayList();
+ HashSet<String> lines = Sets.newHashSet();
+ for (int line = 0; line < 5; line++) {
+ lines.add("f0" + "l" + line);
+ }
+ allLines.addAll(lines);
+ File testFile = new File(testMeta.dir + subdir04, "file0");
+ dPaths.add(new File(testMeta.dir + subdir04).getCanonicalPath());
+ FileUtils.write(testFile, StringUtils.join(lines, '\n'));
+
+ LineOperator oper = new LineOperator();
+ oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
+ oper.setScanIntervalMillis(0);
+
+ CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
+ oper.output.setSink(sink);
+
+ int wid = 0;
+
+ // Read all records to populate processedList in operator.
+ oper.setup(testMeta.context);
+ for (int i = 0; i < 3; i++) {
+ oper.beginWindow(wid);
+ oper.emitTuples();
+ oper.endWindow();
+ wid++;
+ }
+
+ Assert.assertEquals("Size", 5, oper.dirPaths.size());
+ Assert.assertTrue("Checking Sets", dPaths.equals(oper.dirPaths));
+ }
+
@Test
public void testScannerPartitioning() throws Exception
{
@@ -169,10 +232,10 @@ public class AbstractFileInputOperatorTest
Set<Path> allFiles = Sets.newHashSet();
for (DirectoryScanner partition : partitions) {
Set<Path> files = partition.scan(fs, path, Sets.<String>newHashSet());
- Assert.assertEquals("", 2, files.size());
+ Assert.assertEquals("", 3, files.size());
allFiles.addAll(files);
}
- Assert.assertEquals("Found all files " + allFiles, 4, allFiles.size());
+ Assert.assertEquals("Found all files " + allFiles, 5, allFiles.size());
}
@@ -201,7 +264,7 @@ public class AbstractFileInputOperatorTest
Assert.assertNotSame(oper.getScanner(), p.getPartitionedInstance().getScanner());
Set<String> consumed = Sets.newHashSet();
LinkedHashSet<Path> files = p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, consumed);
- Assert.assertEquals("partition " + files, 2, files.size());
+ Assert.assertEquals("partition " + files, 3, files.size());
}
}
@@ -1109,7 +1172,7 @@ public class AbstractFileInputOperatorTest
Assert.assertNotSame(oper.getScanner(), p.getPartitionedInstance().getScanner());
Set<String> consumed = Sets.newHashSet();
LinkedHashSet<Path> files = p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, consumed);
- Assert.assertEquals("partition " + files, 5, files.size());
+ Assert.assertEquals("partition " + files, 6, files.size());
}
}