You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/06 08:40:38 UTC
[41/50] ignite git commit: IGNITE-4276: Hadoop: added configurable
throttle for shuffle message sending. Disabled by default.
IGNITE-4276: Hadoop: added configurable throttle for shuffle message sending. Disabled by default.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27753f9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27753f9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27753f9d
Branch: refs/heads/ignite-comm-balance-master
Commit: 27753f9d5a7fd3c81ee852eedf3ed27248028d4c
Parents: 31bc3bf
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 14:57:25 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 14:57:25 2016 +0300
----------------------------------------------------------------------
.../internal/processors/hadoop/HadoopJobProperty.java | 10 +++++++++-
.../processors/hadoop/shuffle/HadoopShuffleJob.java | 9 ++++++++-
2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/27753f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index dcfbcba..e7bf565 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -81,7 +81,15 @@ public enum HadoopJobProperty {
* <p>
* By default is {@code false}.
*/
- SHUFFLE_REDUCER_NO_SORTING;
+ SHUFFLE_REDUCER_NO_SORTING,
+
+ /**
+ * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter
+ * controls sleep duration between iterations through intermediate reducer maps.
+ * <p>
+ * Defaults to {@code 0}.
+ */
+ SHUFFLE_JOB_THROTTLE;
/** */
private final String ptyName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/27753f9d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index b940c72..8c731c0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -108,6 +109,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** */
private final IgniteLogger log;
+ /** */
+ private final long throttle;
+
/**
* @param locReduceAddr Local reducer address.
* @param log Logger.
@@ -136,6 +140,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
maps = new AtomicReferenceArray<>(totalReducerCnt);
msgs = new HadoopShuffleMessage[totalReducerCnt];
+
+ throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0);
}
/**
@@ -175,7 +181,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
@Override protected void body() throws InterruptedException {
try {
while (!isCancelled()) {
- Thread.sleep(5);
+ if (throttle > 0)
+ Thread.sleep(throttle);
collectUpdatesAndSend(false);
}