You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/01/10 06:55:52 UTC

apex-malhar git commit: APEXMALHAR-2379 Validation of attributes for AbstractFileOutputOperator, regex on name instead of path, added @ Min constraint annotation

Repository: apex-malhar
Updated Branches:
  refs/heads/master 91767c589 -> 16b15c21b


APEXMALHAR-2379 Validation of attributes for AbstractFileOutputOperator, regex on name instead of path, added @ Min constraint annotation


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/16b15c21
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/16b15c21
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/16b15c21

Branch: refs/heads/master
Commit: 16b15c21be47eea7ff262cd5dd7eb44d824736b5
Parents: 91767c5
Author: francisf <fr...@gmail.com>
Authored: Mon Jan 2 18:50:11 2017 +0530
Committer: francisf <fr...@gmail.com>
Committed: Mon Jan 9 12:36:40 2017 +0530

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java       | 17 ++++++++++++++++-
 .../lib/io/fs/AbstractFileInputOperatorTest.java   |  2 +-
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16b15c21/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index be156d1..bc5ebf1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -19,6 +19,7 @@
 package com.datatorrent.lib.io.fs;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -37,6 +38,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
@@ -106,14 +108,17 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
   protected String directory;
   @NotNull
   protected DirectoryScanner scanner = new DirectoryScanner();
+  @Min(0)
   protected int scanIntervalMillis = 5000;
   protected long offset;
   protected String currentFile;
   protected Set<String> processedFiles = new HashSet<String>();
+  @Min(1)
   protected int emitBatchSize = 1000;
   protected int currentPartitions = 1;
   protected int partitionCount = 1;
   private int retryCount = 0;
+  @Min(0)
   private int maxRetryCount = 5;
   protected transient long skipCount = 0;
   private transient OperatorContext context;
@@ -366,6 +371,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
    */
   public void setScanIntervalMillis(int scanIntervalMillis)
   {
+    if (scanIntervalMillis < 0) {
+      throw new IllegalArgumentException("scanIntervalMillis should be greater than or equal to 0.");
+    }
     this.scanIntervalMillis = scanIntervalMillis;
   }
 
@@ -384,6 +392,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
    */
   public void setEmitBatchSize(int emitBatchSize)
   {
+    if (emitBatchSize <= 0) {
+      throw new IllegalArgumentException("emitBatchSize should be greater than 0.");
+    }
     this.emitBatchSize = emitBatchSize;
   }
 
@@ -994,6 +1005,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
    */
   public void setMaxRetryCount(int maxRetryCount)
   {
+    if (maxRetryCount < 0) {
+      throw new IllegalArgumentException("maxRetryCount should be greater than or equal to 0.");
+    }
     this.maxRetryCount = maxRetryCount;
   }
 
@@ -1100,7 +1114,8 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
       }
       Pattern regex = this.getRegex();
       if (regex != null) {
-        Matcher matcher = regex.matcher(filePathStr);
+        String fileName = new File(filePathStr).getName();
+        Matcher matcher = regex.matcher(fileName);
         if (!matcher.matches()) {
           return false;
         }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16b15c21/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 1a97f5e..5ceac01 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -130,7 +130,7 @@ public class AbstractFileInputOperatorTest
     oper.output.setSink(sink);
 
     oper.setDirectory(testMeta.dir);
-    oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+    oper.getScanner().setFilePatternRegexp("((?!target).)*file[\\d]");
     oper.getScanner().setRecursive(recursive);
 
     oper.setup(testMeta.context);