You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2016/06/01 02:01:51 UTC
[1/2] incubator-apex-malhar git commit: APEXMALHAR-2096: Add property
blocksThreshold to limit input rate
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/master a14b48716 -> 127ffe752
APEXMALHAR-2096: Add property blocksThreshold to limit input rate
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/703ff065
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/703ff065
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/703ff065
Branch: refs/heads/master
Commit: 703ff065a1cafc2de0120bb12818e4920ce86f27
Parents: a14b487
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed May 18 17:52:08 2016 -0700
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 31 17:35:01 2016 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileSplitter.java | 17 +++++++----
.../datatorrent/lib/io/fs/FSInputModule.java | 31 +++++++++++++++++---
.../lib/io/fs/FSInputModuleAppTest.java | 1 +
.../lib/io/fs/FileSplitterBaseTest.java | 5 +++-
.../lib/io/fs/FileSplitterInputTest.java | 1 +
5 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index 2ea961e..7e6bd2f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -71,11 +71,6 @@ public abstract class AbstractFileSplitter extends BaseOperator
public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput =
new DefaultOutputPort<>();
- public AbstractFileSplitter()
- {
- blocksThreshold = Integer.MAX_VALUE;
- }
-
@Override
public void setup(Context.OperatorContext context)
{
@@ -280,11 +275,23 @@ public abstract class AbstractFileSplitter extends BaseOperator
return blockSize;
}
+ /**
+ * Sets number of blocks to be emitted per window.<br/>
+ * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+ * readersCount.
+ * @param threshold
+ */
public void setBlocksThreshold(int threshold)
{
this.blocksThreshold = threshold;
}
+ /**
+ * Gets number of blocks to be emitted per window.<br/>
+ * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+ * readersCount.
+ * @return
+ */
public int getBlocksThreshold()
{
return blocksThreshold;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index 713e745..e8af9aa 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -42,9 +42,8 @@ import com.datatorrent.netlet.util.Slice;
* 4. recursive: if scan recursively input directories<br/>
* 5. blockSize: block size used to read input blocks of file<br/>
* 6. readersCount: count of readers to read input file<br/>
- * 7. sequencialFileRead: If emit file blocks in sequence?
- *
- * @since 3.4.0
+ * 7. sequencialFileRead: If emit file blocks in sequence?<br/>
+ * 8. blocksThreshold: number of blocks emitted per window
*/
public class FSInputModule implements Module
@@ -59,6 +58,8 @@ public class FSInputModule implements Module
private long blockSize;
private boolean sequencialFileRead = false;
private int readersCount;
+ @Min(1)
+ protected int blocksThreshold;
public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> filesMetadataOutput = new ProxyOutputPort<>();
public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
@@ -107,8 +108,8 @@ public class FSInputModule implements Module
blockReader.setBasePath(files);
if (readersCount != 0) {
dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<FSSliceReader>(readersCount));
- fileSplitter.setBlocksThreshold(readersCount);
}
+ fileSplitter.setBlocksThreshold(blocksThreshold);
}
/**
@@ -252,6 +253,28 @@ public class FSInputModule implements Module
this.sequencialFileRead = sequencialFileRead;
}
+ /**
+ * Sets number of blocks to be emitted per window.<br/>
+ * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+ * readersCount.
+ * @param threshold
+ */
+ public void setBlocksThreshold(int threshold)
+ {
+ this.blocksThreshold = threshold;
+ }
+
+ /**
+ * Gets number of blocks to be emitted per window.<br/>
+ * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+ * readersCount.
+ * @return
+ */
+ public int getBlocksThreshold()
+ {
+ return blocksThreshold;
+ }
+
public static class SequentialFileBlockMetadataCodec
extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata>
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
index 19ab84f..4213a00 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
@@ -97,6 +97,7 @@ public class FSInputModuleAppTest
Configuration conf = new Configuration(false);
conf.set("dt.operator.hdfsInputModule.prop.files", inputDir);
conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
+ conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4");
conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
LocalMode lma = LocalMode.newInstance();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index ac8c7e2..862e589 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -80,6 +80,7 @@ public class FileSplitterBaseTest
}
fileSplitter = new FileSplitterBase();
+ fileSplitter.setBlocksThreshold(100);
fileSplitter.setFile(this.dataDirectory);
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
@@ -199,7 +200,9 @@ public class FileSplitterBaseTest
{
LocalMode lma = LocalMode.newInstance();
SplitterApp app = new SplitterApp();
- lma.prepareDAG(app, new Configuration());
+ Configuration appConf = new Configuration();
+ appConf.set("dt.operator.Splitter.prop.blocksThreshold", "4");
+ lma.prepareDAG(app, appConf);
lma.cloneDAG(); // check serialization
LocalMode.Controller lc = lma.getController();
lc.runAsync();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index faa1d45..cea5109 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -104,6 +104,7 @@ public class FileSplitterInputTest
}
fileSplitterInput = new FileSplitterInput();
+ fileSplitterInput.setBlocksThreshold(100);
scanner = new MockScanner();
fileSplitterInput.setScanner(scanner);
fileSplitterInput.getScanner().setScanIntervalMillis(500);
[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2096'
Posted by cs...@apache.org.
Merge branch 'APEXMALHAR-2096'
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/127ffe75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/127ffe75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/127ffe75
Branch: refs/heads/master
Commit: 127ffe75226abcf91aa5234713ca5f99bc76c46f
Parents: a14b487 703ff06
Author: Chandni Singh <cs...@apache.org>
Authored: Tue May 31 18:57:29 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue May 31 18:57:29 2016 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileSplitter.java | 17 +++++++----
.../datatorrent/lib/io/fs/FSInputModule.java | 31 +++++++++++++++++---
.../lib/io/fs/FSInputModuleAppTest.java | 1 +
.../lib/io/fs/FileSplitterBaseTest.java | 5 +++-
.../lib/io/fs/FileSplitterInputTest.java | 1 +
5 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------