You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by cs...@apache.org on 2016/06/01 02:01:51 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2096: Add property blocksThreshold to limit input rate

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master a14b48716 -> 127ffe752


APEXMALHAR-2096: Add property blocksThreshold to limit input rate


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/703ff065
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/703ff065
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/703ff065

Branch: refs/heads/master
Commit: 703ff065a1cafc2de0120bb12818e4920ce86f27
Parents: a14b487
Author: Priyanka Gugale <pr...@datatorrent.com>
Authored: Wed May 18 17:52:08 2016 -0700
Committer: Priyanka Gugale <pr...@datatorrent.com>
Committed: Tue May 31 17:35:01 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileSplitter.java         | 17 +++++++----
 .../datatorrent/lib/io/fs/FSInputModule.java    | 31 +++++++++++++++++---
 .../lib/io/fs/FSInputModuleAppTest.java         |  1 +
 .../lib/io/fs/FileSplitterBaseTest.java         |  5 +++-
 .../lib/io/fs/FileSplitterInputTest.java        |  1 +
 5 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index 2ea961e..7e6bd2f 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -71,11 +71,6 @@ public abstract class AbstractFileSplitter extends BaseOperator
   public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput =
       new DefaultOutputPort<>();
 
-  public AbstractFileSplitter()
-  {
-    blocksThreshold = Integer.MAX_VALUE;
-  }
-
   @Override
   public void setup(Context.OperatorContext context)
   {
@@ -280,11 +275,23 @@ public abstract class AbstractFileSplitter extends BaseOperator
     return blockSize;
   }
 
+  /**
+   * Sets number of blocks to be emitted per window.<br/>
+   * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+   * readersCount.
+   * @param threshold
+   */
   public void setBlocksThreshold(int threshold)
   {
     this.blocksThreshold = threshold;
   }
 
+  /**
+   * Gets number of blocks to be emitted per window.<br/>
+   * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+   * readersCount.
+   * @return
+   */
   public int getBlocksThreshold()
   {
     return blocksThreshold;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index 713e745..e8af9aa 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -42,9 +42,8 @@ import com.datatorrent.netlet.util.Slice;
  * 4. recursive: if scan recursively input directories<br/>
  * 5. blockSize: block size used to read input blocks of file<br/>
  * 6. readersCount: count of readers to read input file<br/>
- * 7. sequencialFileRead: If emit file blocks in sequence?
- *
- * @since 3.4.0
+ * 7. sequencialFileRead: If emit file blocks in sequence?<br/>
+ * 8. blocksThreshold: number of blocks emitted per window
  */
 
 public class FSInputModule implements Module
@@ -59,6 +58,8 @@ public class FSInputModule implements Module
   private long blockSize;
   private boolean sequencialFileRead = false;
   private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
 
   public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> filesMetadataOutput = new ProxyOutputPort<>();
   public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
@@ -107,8 +108,8 @@ public class FSInputModule implements Module
     blockReader.setBasePath(files);
     if (readersCount != 0) {
       dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<FSSliceReader>(readersCount));
-      fileSplitter.setBlocksThreshold(readersCount);
     }
+    fileSplitter.setBlocksThreshold(blocksThreshold);
   }
 
   /**
@@ -252,6 +253,28 @@ public class FSInputModule implements Module
     this.sequencialFileRead = sequencialFileRead;
   }
 
+  /**
+   * Sets number of blocks to be emitted per window.<br/>
+   * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+   * readersCount.
+   * @param threshold
+   */
+  public void setBlocksThreshold(int threshold)
+  {
+    this.blocksThreshold = threshold;
+  }
+
+  /**
+   * Gets number of blocks to be emitted per window.<br/>
+   * A lot of blocks emitted per window can overwhelm the downstream operators. Set this value considering blockSize and
+   * readersCount.
+   * @return
+   */
+  public int getBlocksThreshold()
+  {
+    return blocksThreshold;
+  }
+
   public static class SequentialFileBlockMetadataCodec
       extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata>
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
index 19ab84f..4213a00 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java
@@ -97,6 +97,7 @@ public class FSInputModuleAppTest
     Configuration conf = new Configuration(false);
     conf.set("dt.operator.hdfsInputModule.prop.files", inputDir);
     conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
+    conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4");
     conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
 
     LocalMode lma = LocalMode.newInstance();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
index ac8c7e2..862e589 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java
@@ -80,6 +80,7 @@ public class FileSplitterBaseTest
       }
 
       fileSplitter = new FileSplitterBase();
+      fileSplitter.setBlocksThreshold(100);
       fileSplitter.setFile(this.dataDirectory);
 
       Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
@@ -199,7 +200,9 @@ public class FileSplitterBaseTest
   {
     LocalMode lma = LocalMode.newInstance();
     SplitterApp app = new SplitterApp();
-    lma.prepareDAG(app, new Configuration());
+    Configuration appConf = new Configuration();
+    appConf.set("dt.operator.Splitter.prop.blocksThreshold", "4");
+    lma.prepareDAG(app, appConf);
     lma.cloneDAG(); // check serialization
     LocalMode.Controller lc = lma.getController();
     lc.runAsync();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/703ff065/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index faa1d45..cea5109 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -104,6 +104,7 @@ public class FileSplitterInputTest
       }
 
       fileSplitterInput = new FileSplitterInput();
+      fileSplitterInput.setBlocksThreshold(100);
       scanner = new MockScanner();
       fileSplitterInput.setScanner(scanner);
       fileSplitterInput.getScanner().setScanIntervalMillis(500);


[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2096'

Posted by cs...@apache.org.
Merge branch 'APEXMALHAR-2096'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/127ffe75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/127ffe75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/127ffe75

Branch: refs/heads/master
Commit: 127ffe75226abcf91aa5234713ca5f99bc76c46f
Parents: a14b487 703ff06
Author: Chandni Singh <cs...@apache.org>
Authored: Tue May 31 18:57:29 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue May 31 18:57:29 2016 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileSplitter.java         | 17 +++++++----
 .../datatorrent/lib/io/fs/FSInputModule.java    | 31 +++++++++++++++++---
 .../lib/io/fs/FSInputModuleAppTest.java         |  1 +
 .../lib/io/fs/FileSplitterBaseTest.java         |  5 +++-
 .../lib/io/fs/FileSplitterInputTest.java        |  1 +
 5 files changed, 45 insertions(+), 10 deletions(-)
----------------------------------------------------------------------