You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2017/01/10 06:55:52 UTC
apex-malhar git commit: APEXMALHAR-2379 Validation of attributes for
AbstractFileOutputOperator, regex on name instead of path,
added @ Min constraint annotation
Repository: apex-malhar
Updated Branches:
refs/heads/master 91767c589 -> 16b15c21b
APEXMALHAR-2379 Validation of attributes for AbstractFileOutputOperator, regex on name instead of path, added @ Min constraint annotation
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/16b15c21
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/16b15c21
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/16b15c21
Branch: refs/heads/master
Commit: 16b15c21be47eea7ff262cd5dd7eb44d824736b5
Parents: 91767c5
Author: francisf <fr...@gmail.com>
Authored: Mon Jan 2 18:50:11 2017 +0530
Committer: francisf <fr...@gmail.com>
Committed: Mon Jan 9 12:36:40 2017 +0530
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileInputOperator.java | 17 ++++++++++++++++-
.../lib/io/fs/AbstractFileInputOperatorTest.java | 2 +-
2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16b15c21/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index be156d1..bc5ebf1 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -19,6 +19,7 @@
package com.datatorrent.lib.io.fs;
import java.io.BufferedReader;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -37,6 +38,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
@@ -106,14 +108,17 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
protected String directory;
@NotNull
protected DirectoryScanner scanner = new DirectoryScanner();
+ @Min(0)
protected int scanIntervalMillis = 5000;
protected long offset;
protected String currentFile;
protected Set<String> processedFiles = new HashSet<String>();
+ @Min(1)
protected int emitBatchSize = 1000;
protected int currentPartitions = 1;
protected int partitionCount = 1;
private int retryCount = 0;
+ @Min(0)
private int maxRetryCount = 5;
protected transient long skipCount = 0;
private transient OperatorContext context;
@@ -366,6 +371,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
*/
public void setScanIntervalMillis(int scanIntervalMillis)
{
+ if (scanIntervalMillis < 0) {
+ throw new IllegalArgumentException("scanIntervalMillis should be greater than or equal to 0.");
+ }
this.scanIntervalMillis = scanIntervalMillis;
}
@@ -384,6 +392,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
*/
public void setEmitBatchSize(int emitBatchSize)
{
+ if (emitBatchSize <= 0) {
+ throw new IllegalArgumentException("emitBatchSize should be greater than 0.");
+ }
this.emitBatchSize = emitBatchSize;
}
@@ -994,6 +1005,9 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
*/
public void setMaxRetryCount(int maxRetryCount)
{
+ if (maxRetryCount < 0) {
+ throw new IllegalArgumentException("maxRetryCount should be greater than or equal to 0.");
+ }
this.maxRetryCount = maxRetryCount;
}
@@ -1100,7 +1114,8 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
}
Pattern regex = this.getRegex();
if (regex != null) {
- Matcher matcher = regex.matcher(filePathStr);
+ String fileName = new File(filePathStr).getName();
+ Matcher matcher = regex.matcher(fileName);
if (!matcher.matches()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16b15c21/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 1a97f5e..5ceac01 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -130,7 +130,7 @@ public class AbstractFileInputOperatorTest
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
- oper.getScanner().setFilePatternRegexp(".*file[\\d]");
+ oper.getScanner().setFilePatternRegexp("((?!target).)*file[\\d]");
oper.getScanner().setRecursive(recursive);
oper.setup(testMeta.context);