You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/07/06 21:13:01 UTC
apex-malhar git commit: APEXMALHAR-2069: moved creation of
scanService to setup, : throw a RuntimeException if scanService is already set,
fix unit tests to call setup only on WindowDataManager : move the
initialization of most data members from the
Repository: apex-malhar
Updated Branches:
refs/heads/APEXMALHAR-2069 [created] c4a11299b
APEXMALHAR-2069: moved creation of scanService to setup,
: throw a RuntimeException if scanService is already set, fix unit tests to call setup only on WindowDataManager
: move the initialization of most data members from the constructor to setup for the TimeBasedDirectoryScanner class
: move the initialization of most data members from the ctor to setup for FileSplitterInput class
: don't overwrite previous value of referenceTimes in case of restore
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c4a11299
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c4a11299
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c4a11299
Branch: refs/heads/APEXMALHAR-2069
Commit: c4a11299b86f9b5e2b0a44bc9057685c8f3d7663
Parents: 32840a2
Author: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Authored: Wed Jun 29 18:08:30 2016 -0700
Committer: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Committed: Wed Jul 6 12:49:17 2016 -0700
----------------------------------------------------------------------
.../lib/io/fs/FileSplitterInput.java | 31 ++++++++++++--------
.../lib/io/fs/FileSplitterInputTest.java | 10 +++----
2 files changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c4a11299/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 077a4ac..b8b21cb 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -80,13 +80,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
{
@NotNull
private WindowDataManager windowDataManager;
- @NotNull
- protected final transient LinkedList<ScannedFileInfo> currentWindowRecoveryState;
+
+ protected transient LinkedList<ScannedFileInfo> currentWindowRecoveryState;
@Valid
@NotNull
private TimeBasedDirectoryScanner scanner;
- @NotNull
+
private Map<String, Map<String, Long>> referenceTimes;
private transient long sleepMillis;
@@ -94,15 +94,17 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
public FileSplitterInput()
{
super();
- currentWindowRecoveryState = Lists.newLinkedList();
windowDataManager = new WindowDataManager.NoopWindowDataManager();
- referenceTimes = Maps.newHashMap();
scanner = new TimeBasedDirectoryScanner();
}
@Override
public void setup(Context.OperatorContext context)
{
+ currentWindowRecoveryState = Lists.newLinkedList();
+ if (referenceTimes == null) {
+ referenceTimes = Maps.newHashMap();
+ }
sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
scanner.setup(context);
windowDataManager.setup(context);
@@ -288,12 +290,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
protected transient long lastScanMillis;
protected transient FileSystem fs;
- protected final transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
- protected final transient ExecutorService scanService;
- protected final transient AtomicReference<Throwable> atomicThrowable;
+ protected transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
+ protected transient ExecutorService scanService;
+ protected transient AtomicReference<Throwable> atomicThrowable;
private transient volatile boolean running;
- protected final transient HashSet<String> ignoredFiles;
+ protected transient HashSet<String> ignoredFiles;
protected transient Pattern regex;
private transient Pattern ignoreRegex;
@@ -309,15 +311,18 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
recursive = true;
scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
files = Sets.newLinkedHashSet();
- scanService = Executors.newSingleThreadExecutor();
- discoveredFiles = new LinkedBlockingDeque<>();
- atomicThrowable = new AtomicReference<>();
- ignoredFiles = Sets.newHashSet();
}
@Override
public void setup(Context.OperatorContext context)
{
+ if (scanService != null) {
+ throw new RuntimeException("multiple calls to setup() detected!");
+ }
+ scanService = Executors.newSingleThreadExecutor();
+ discoveredFiles = new LinkedBlockingDeque<>();
+ atomicThrowable = new AtomicReference<>();
+ ignoredFiles = Sets.newHashSet();
sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
if (filePatternRegularExp != null) {
regex = Pattern.compile(filePatternRegularExp);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c4a11299/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index febda3f..1b8efff 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -226,13 +226,13 @@ public class FileSplitterInputTest
FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager();
testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager);
- testMeta.fileSplitterInput.setup(testMeta.context);
+ fsIdempotentStorageManager.setup(testMeta.context);
//will emit window 1 from data directory
testFileMetadata();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
- testMeta.fileSplitterInput.setup(testMeta.context);
+ fsIdempotentStorageManager.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);
Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
@@ -331,13 +331,13 @@ public class FileSplitterInputTest
testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager);
testMeta.fileSplitterInput.setBlocksThreshold(10);
testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
- testMeta.fileSplitterInput.setup(testMeta.context);
+ fsWindowDataManager.setup(testMeta.context);
testBlocksThreshold();
testMeta.fileMetadataSink.clear();
testMeta.blockMetadataSink.clear();
- testMeta.fileSplitterInput.setup(testMeta.context);
+ fsWindowDataManager.setup(testMeta.context);
for (int i = 1; i < 8; i++) {
testMeta.fileSplitterInput.beginWindow(i);
}
@@ -499,7 +499,7 @@ public class FileSplitterInputTest
testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
- testMeta.fileSplitterInput.setup(testMeta.context);
+ fsWindowDataManager.setup(testMeta.context);
testMeta.fileSplitterInput.beginWindow(1);