You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/24 11:44:02 UTC

apex-malhar git commit: APEXMALHAR-2237 Changes in FSInputModule to support Dynamic partitioning.

Repository: apex-malhar
Updated Branches:
  refs/heads/master c92ca15e8 -> 37991576d


APEXMALHAR-2237 Changes in FSInputModule to support Dynamic partitioning.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37991576
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37991576
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37991576

Branch: refs/heads/master
Commit: 37991576d826c2ac30a077b0774a3fbada3c1f9c
Parents: c92ca15
Author: yogidevendra <yo...@apache.org>
Authored: Wed Sep 21 12:29:06 2016 +0530
Committer: yogidevendra <yo...@apache.org>
Committed: Mon Oct 24 14:09:41 2016 +0530

----------------------------------------------------------------------
 apps/filecopy/src/site/conf/app-conf.xml        | 22 +----
 .../lib/io/block/AbstractBlockReader.java       |  4 +-
 .../datatorrent/lib/io/fs/FSInputModule.java    | 98 +++++++++++++++-----
 3 files changed, 80 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/apps/filecopy/src/site/conf/app-conf.xml
----------------------------------------------------------------------
diff --git a/apps/filecopy/src/site/conf/app-conf.xml b/apps/filecopy/src/site/conf/app-conf.xml
index 964ad7d..6f995bf 100644
--- a/apps/filecopy/src/site/conf/app-conf.xml
+++ b/apps/filecopy/src/site/conf/app-conf.xml
@@ -23,26 +23,14 @@
 <configuration>
   <property>
     <name>dt.operator.HDFSInputModule.prop.files</name>
-    <value>hdfs://localhost:54310/user/dtadmin/input</value>
-  </property>
-  <property>
-    <name>dt.operator.HDFSInputModule.prop.blockSize</name>
-    <value>128000000</value>
-  </property>
-  <property>
-    <name>dt.operator.HDFSInputModule.prop.scanIntervalMillis</name>
-    <value>10000</value>
-  </property>
-  <property>
-    <name>dt.operator.HDFSInputModule.prop.dedup</name>
-    <value>true</value>
+    <value>hdfs://localhost:54310/user/appuser/input</value>
   </property>
   <property>
     <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
-    <value>hdfs://localhost:54310/user/dtadmin/output</value>
+    <value>hdfs://localhost:54310/user/appuser/output</value>
   </property>
   <property>
-    <name>dt.loggers.level</name>
-    <value>com.datatorrent.*:DEBUG,org.apache.*:INFO</value>
-</property>
+    <name>dt.operator.HDFSInputModule.prop.blocksThreshold</name>
+    <value>1</value>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
index 268a17b..734df38 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
@@ -115,7 +115,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
    */
   protected int minReaders;
   /**
-   * Interval at which stats are processed. Default : 1 minute
+   * Interval at which stats are processed. Default : 2 minutes
    */
   protected long intervalMillis;
 
@@ -147,7 +147,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
   {
     maxReaders = 16;
     minReaders = 1;
-    intervalMillis = 60 * 1000L;
+    intervalMillis = 2 * 60 * 1000L;
     response = new StatsListener.Response();
     backlogPerOperator = Maps.newHashMap();
     partitionCount = 1;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index d71111c..6d54756 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Module;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
 import com.datatorrent.lib.io.block.AbstractBlockReader;
 import com.datatorrent.lib.io.block.BlockMetadata;
@@ -41,9 +40,11 @@ import com.datatorrent.netlet.util.Slice;
  * 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
  * 4. recursive: if scan recursively input directories<br/>
  * 5. blockSize: block size used to read input blocks of file<br/>
- * 6. readersCount: count of readers to read input file<br/>
- * 7. sequentialFileRead: If emit file blocks in sequence?<br/>
- * 8. blocksThreshold: number of blocks emitted per window
+ * 6. sequentialFileRead: If emit file blocks in sequence?<br/>
+ * 7. blocksThreshold: number of blocks emitted per window
+ * 8. minReaders: Minimum number of block readers for dynamic partitioning
+ * 9. maxReaders: Maximum number of block readers for dynamic partitioning
+ * 10. repartitionCheckInterval: Interval for re-evaluating dynamic partitioning
  *
  * @since 3.5.0
  */
@@ -59,9 +60,11 @@ public class FSInputModule implements Module
   private boolean recursive = true;
   private long blockSize;
   private boolean sequentialFileRead = false;
-  private int readersCount;
   @Min(1)
   protected int blocksThreshold;
+  protected int minReaders;
+  protected int maxReaders;
+  protected long repartitionCheckInterval;
 
   public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> filesMetadataOutput = new ProxyOutputPort<>();
   public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
@@ -108,8 +111,17 @@ public class FSInputModule implements Module
     }
 
     blockReader.setBasePath(files);
-    if (readersCount != 0) {
-      dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<FSSliceReader>(readersCount));
+
+    if (minReaders != 0) {
+      blockReader.setMinReaders(minReaders);
+    }
+
+    if (maxReaders != 0) {
+      blockReader.setMaxReaders(maxReaders);
+    }
+
+    if (repartitionCheckInterval != 0) {
+      blockReader.setIntervalMillis(repartitionCheckInterval);
     }
     fileSplitter.setBlocksThreshold(blocksThreshold);
   }
@@ -218,24 +230,6 @@ public class FSInputModule implements Module
   }
 
   /**
-   * Gets readers count
-   * @return readersCount
-   */
-  public int getReadersCount()
-  {
-    return readersCount;
-  }
-
-  /**
-   * Static count of readers to read input file
-   * @param readersCount
-   */
-  public void setReadersCount(int readersCount)
-  {
-    this.readersCount = readersCount;
-  }
-
-  /**
    * Gets is sequential file read
    *
    * @return sequentialFileRead
@@ -277,6 +271,60 @@ public class FSInputModule implements Module
     return blocksThreshold;
   }
 
+  /**
+   * Gets minimum number of block readers for dynamic partitioning.
+   * @return minimum instances of block reader.
+   */
+  public int getMinReaders()
+  {
+    return minReaders;
+  }
+
+  /**
+   * Sets minimum number of block readers for dynamic partitioning.
+   * @param minReaders minimum number of readers.
+   */
+  public void setMinReaders(int minReaders)
+  {
+    this.minReaders = minReaders;
+  }
+
+  /**
+   * Gets maximum number of block readers for dynamic partitioning.
+   * @return maximum instances of block reader.
+   */
+  public int getMaxReaders()
+  {
+    return maxReaders;
+  }
+
+  /**
+   * Sets maximum number of block readers for dynamic partitioning.
+   * @param maxReaders maximum number of readers.
+   */
+  public void setMaxReaders(int maxReaders)
+  {
+    this.maxReaders = maxReaders;
+  }
+
+  /**
+   * Gets Interval for re-evaluating dynamic partitioning
+   * @return interval for re-evaluating dynamic partitioning
+   */
+  public long getRepartitionCheckInterval()
+  {
+    return repartitionCheckInterval;
+  }
+
+  /**
+   * Sets Interval for re-evaluating dynamic partitioning
+   * @param repartitionCheckInterval interval for re-evaluating dynamic partitioning
+   */
+  public void setRepartitionCheckInterval(long repartitionCheckInterval)
+  {
+    this.repartitionCheckInterval = repartitionCheckInterval;
+  }
+
   public static class SequentialFileBlockMetadataCodec
       extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata>
   {