You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/10/24 11:44:02 UTC
apex-malhar git commit: APEXMALHAR-2237 Changes in FSInputModule to
support Dynamic partitioning.
Repository: apex-malhar
Updated Branches:
refs/heads/master c92ca15e8 -> 37991576d
APEXMALHAR-2237 Changes in FSInputModule to support Dynamic partitioning.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37991576
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37991576
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37991576
Branch: refs/heads/master
Commit: 37991576d826c2ac30a077b0774a3fbada3c1f9c
Parents: c92ca15
Author: yogidevendra <yo...@apache.org>
Authored: Wed Sep 21 12:29:06 2016 +0530
Committer: yogidevendra <yo...@apache.org>
Committed: Mon Oct 24 14:09:41 2016 +0530
----------------------------------------------------------------------
apps/filecopy/src/site/conf/app-conf.xml | 22 +----
.../lib/io/block/AbstractBlockReader.java | 4 +-
.../datatorrent/lib/io/fs/FSInputModule.java | 98 +++++++++++++++-----
3 files changed, 80 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/apps/filecopy/src/site/conf/app-conf.xml
----------------------------------------------------------------------
diff --git a/apps/filecopy/src/site/conf/app-conf.xml b/apps/filecopy/src/site/conf/app-conf.xml
index 964ad7d..6f995bf 100644
--- a/apps/filecopy/src/site/conf/app-conf.xml
+++ b/apps/filecopy/src/site/conf/app-conf.xml
@@ -23,26 +23,14 @@
<configuration>
<property>
<name>dt.operator.HDFSInputModule.prop.files</name>
- <value>hdfs://localhost:54310/user/dtadmin/input</value>
- </property>
- <property>
- <name>dt.operator.HDFSInputModule.prop.blockSize</name>
- <value>128000000</value>
- </property>
- <property>
- <name>dt.operator.HDFSInputModule.prop.scanIntervalMillis</name>
- <value>10000</value>
- </property>
- <property>
- <name>dt.operator.HDFSInputModule.prop.dedup</name>
- <value>true</value>
+ <value>hdfs://localhost:54310/user/appuser/input</value>
</property>
<property>
<name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
- <value>hdfs://localhost:54310/user/dtadmin/output</value>
+ <value>hdfs://localhost:54310/user/appuser/output</value>
</property>
<property>
- <name>dt.loggers.level</name>
- <value>com.datatorrent.*:DEBUG,org.apache.*:INFO</value>
-</property>
+ <name>dt.operator.HDFSInputModule.prop.blocksThreshold</name>
+ <value>1</value>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
index 268a17b..734df38 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
@@ -115,7 +115,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
*/
protected int minReaders;
/**
- * Interval at which stats are processed. Default : 1 minute
+ * Interval at which stats are processed. Default : 2 minutes
*/
protected long intervalMillis;
@@ -147,7 +147,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
{
maxReaders = 16;
minReaders = 1;
- intervalMillis = 60 * 1000L;
+ intervalMillis = 2 * 60 * 1000L;
response = new StatsListener.Response();
backlogPerOperator = Maps.newHashMap();
partitionCount = 1;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37991576/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
index d71111c..6d54756 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Module;
-import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.block.BlockMetadata;
@@ -41,9 +40,11 @@ import com.datatorrent.netlet.util.Slice;
* 3. scanIntervalMillis: interval between two scans to discover new files in input directory<br/>
* 4. recursive: if scan recursively input directories<br/>
* 5. blockSize: block size used to read input blocks of file<br/>
- * 6. readersCount: count of readers to read input file<br/>
- * 7. sequentialFileRead: If emit file blocks in sequence?<br/>
- * 8. blocksThreshold: number of blocks emitted per window
+ * 6. sequentialFileRead: If emit file blocks in sequence?<br/>
+ * 7. blocksThreshold: number of blocks emitted per window
+ * 8. minReaders: Minimum number of block readers for dynamic partitioning
+ * 9. maxReaders: Maximum number of block readers for dynamic partitioning
+ * 10. repartitionCheckInterval: Interval for re-evaluating dynamic partitioning
*
* @since 3.5.0
*/
@@ -59,9 +60,11 @@ public class FSInputModule implements Module
private boolean recursive = true;
private long blockSize;
private boolean sequentialFileRead = false;
- private int readersCount;
@Min(1)
protected int blocksThreshold;
+ protected int minReaders;
+ protected int maxReaders;
+ protected long repartitionCheckInterval;
public final transient ProxyOutputPort<AbstractFileSplitter.FileMetadata> filesMetadataOutput = new ProxyOutputPort<>();
public final transient ProxyOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new ProxyOutputPort<>();
@@ -108,8 +111,17 @@ public class FSInputModule implements Module
}
blockReader.setBasePath(files);
- if (readersCount != 0) {
- dag.setAttribute(blockReader, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<FSSliceReader>(readersCount));
+
+ if (minReaders != 0) {
+ blockReader.setMinReaders(minReaders);
+ }
+
+ if (maxReaders != 0) {
+ blockReader.setMaxReaders(maxReaders);
+ }
+
+ if (repartitionCheckInterval != 0) {
+ blockReader.setIntervalMillis(repartitionCheckInterval);
}
fileSplitter.setBlocksThreshold(blocksThreshold);
}
@@ -218,24 +230,6 @@ public class FSInputModule implements Module
}
/**
- * Gets readers count
- * @return readersCount
- */
- public int getReadersCount()
- {
- return readersCount;
- }
-
- /**
- * Static count of readers to read input file
- * @param readersCount
- */
- public void setReadersCount(int readersCount)
- {
- this.readersCount = readersCount;
- }
-
- /**
* Gets is sequential file read
*
* @return sequentialFileRead
@@ -277,6 +271,60 @@ public class FSInputModule implements Module
return blocksThreshold;
}
+ /**
+ * Gets minimum number of block readers for dynamic partitioning.
+ * @return minimum instances of block reader.
+ */
+ public int getMinReaders()
+ {
+ return minReaders;
+ }
+
+ /**
+ * Sets minimum number of block readers for dynamic partitioning.
+ * @param minReaders minimum number of readers.
+ */
+ public void setMinReaders(int minReaders)
+ {
+ this.minReaders = minReaders;
+ }
+
+ /**
+ * Gets maximum number of block readers for dynamic partitioning.
+ * @return maximum instances of block reader.
+ */
+ public int getMaxReaders()
+ {
+ return maxReaders;
+ }
+
+ /**
+ * Sets maximum number of block readers for dynamic partitioning.
+ * @param maxReaders maximum number of readers.
+ */
+ public void setMaxReaders(int maxReaders)
+ {
+ this.maxReaders = maxReaders;
+ }
+
+ /**
+ * Gets Interval for re-evaluating dynamic partitioning
+ * @return interval for re-evaluating dynamic partitioning
+ */
+ public long getRepartitionCheckInterval()
+ {
+ return repartitionCheckInterval;
+ }
+
+ /**
+ * Sets Interval for re-evaluating dynamic partitioning
+ * @param repartitionCheckInterval interval for re-evaluating dynamic partitioning
+ */
+ public void setRepartitionCheckInterval(long repartitionCheckInterval)
+ {
+ this.repartitionCheckInterval = repartitionCheckInterval;
+ }
+
public static class SequentialFileBlockMetadataCodec
extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata>
{