You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/07/06 21:13:01 UTC

apex-malhar git commit: APEXMALHAR-2069: moved creation of scanService to setup, : throw a RuntimeException if scanService is already set, fix unit tests to call setup only on WindowDataManager : move the initialization of most data members from the

Repository: apex-malhar
Updated Branches:
  refs/heads/APEXMALHAR-2069 [created] c4a11299b


APEXMALHAR-2069: moved creation of scanService to setup,
		: throw a RuntimeException if scanService is already set, fix unit tests to call setup only on WindowDataManager
		: move the initialization of most data members from the constructor to setup for the TimeBasedDirectoryScanner class
		: move the initialization of most data members from the ctor to setup for FileSplitterInput class
		: don't overwrite previous value of referenceTimes in case of restore


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c4a11299
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c4a11299
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c4a11299

Branch: refs/heads/APEXMALHAR-2069
Commit: c4a11299b86f9b5e2b0a44bc9057685c8f3d7663
Parents: 32840a2
Author: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Authored: Wed Jun 29 18:08:30 2016 -0700
Committer: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Committed: Wed Jul 6 12:49:17 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/FileSplitterInput.java            | 31 ++++++++++++--------
 .../lib/io/fs/FileSplitterInputTest.java        | 10 +++----
 2 files changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c4a11299/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index 077a4ac..b8b21cb 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -80,13 +80,13 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
 {
   @NotNull
   private WindowDataManager windowDataManager;
-  @NotNull
-  protected final transient LinkedList<ScannedFileInfo> currentWindowRecoveryState;
+
+  protected transient LinkedList<ScannedFileInfo> currentWindowRecoveryState;
 
   @Valid
   @NotNull
   private TimeBasedDirectoryScanner scanner;
-  @NotNull
+
   private Map<String, Map<String, Long>> referenceTimes;
 
   private transient long sleepMillis;
@@ -94,15 +94,17 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
   public FileSplitterInput()
   {
     super();
-    currentWindowRecoveryState = Lists.newLinkedList();
     windowDataManager = new WindowDataManager.NoopWindowDataManager();
-    referenceTimes = Maps.newHashMap();
     scanner = new TimeBasedDirectoryScanner();
   }
 
   @Override
   public void setup(Context.OperatorContext context)
   {
+    currentWindowRecoveryState = Lists.newLinkedList();
+    if (referenceTimes == null) {
+      referenceTimes = Maps.newHashMap();
+    }
     sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
     scanner.setup(context);
     windowDataManager.setup(context);
@@ -288,12 +290,12 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
 
     protected transient long lastScanMillis;
     protected transient FileSystem fs;
-    protected final transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
-    protected final transient ExecutorService scanService;
-    protected final transient AtomicReference<Throwable> atomicThrowable;
+    protected transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
+    protected transient ExecutorService scanService;
+    protected transient AtomicReference<Throwable> atomicThrowable;
 
     private transient volatile boolean running;
-    protected final transient HashSet<String> ignoredFiles;
+    protected transient HashSet<String> ignoredFiles;
 
     protected transient Pattern regex;
     private transient Pattern ignoreRegex;
@@ -309,15 +311,18 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOper
       recursive = true;
       scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
       files = Sets.newLinkedHashSet();
-      scanService = Executors.newSingleThreadExecutor();
-      discoveredFiles = new LinkedBlockingDeque<>();
-      atomicThrowable = new AtomicReference<>();
-      ignoredFiles = Sets.newHashSet();
     }
 
     @Override
     public void setup(Context.OperatorContext context)
     {
+      if (scanService != null) {
+        throw new RuntimeException("multiple calls to setup() detected!");
+      }
+      scanService = Executors.newSingleThreadExecutor();
+      discoveredFiles = new LinkedBlockingDeque<>();
+      atomicThrowable = new AtomicReference<>();
+      ignoredFiles = Sets.newHashSet();
       sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS);
       if (filePatternRegularExp != null) {
         regex = Pattern.compile(filePatternRegularExp);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c4a11299/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index febda3f..1b8efff 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -226,13 +226,13 @@ public class FileSplitterInputTest
     FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager();
     testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager);
 
-    testMeta.fileSplitterInput.setup(testMeta.context);
+    fsIdempotentStorageManager.setup(testMeta.context);
     //will emit window 1 from data directory
     testFileMetadata();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
-    testMeta.fileSplitterInput.setup(testMeta.context);
+    fsIdempotentStorageManager.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
     Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size());
     for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
@@ -331,13 +331,13 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager);
     testMeta.fileSplitterInput.setBlocksThreshold(10);
     testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
-    testMeta.fileSplitterInput.setup(testMeta.context);
+    fsWindowDataManager.setup(testMeta.context);
 
     testBlocksThreshold();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
-    testMeta.fileSplitterInput.setup(testMeta.context);
+    fsWindowDataManager.setup(testMeta.context);
     for (int i = 1; i < 8; i++) {
       testMeta.fileSplitterInput.beginWindow(i);
     }
@@ -499,7 +499,7 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
 
 
-    testMeta.fileSplitterInput.setup(testMeta.context);
+    fsWindowDataManager.setup(testMeta.context);
 
     testMeta.fileSplitterInput.beginWindow(1);