You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/06 08:40:38 UTC

[41/50] ignite git commit: IGNITE-4276: Hadoop: added configurable throttle for shuffle message sending. Disabled by default.

IGNITE-4276: Hadoop: added configurable throttle for shuffle message sending. Disabled by default.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27753f9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27753f9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27753f9d

Branch: refs/heads/ignite-comm-balance-master
Commit: 27753f9d5a7fd3c81ee852eedf3ed27248028d4c
Parents: 31bc3bf
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 14:57:25 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 14:57:25 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/hadoop/HadoopJobProperty.java     | 10 +++++++++-
 .../processors/hadoop/shuffle/HadoopShuffleJob.java       |  9 ++++++++-
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/27753f9d/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index dcfbcba..e7bf565 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -81,7 +81,15 @@ public enum HadoopJobProperty {
      * <p>
      * By default is {@code false}.
      */
-    SHUFFLE_REDUCER_NO_SORTING;
+    SHUFFLE_REDUCER_NO_SORTING,
+
+    /**
+     * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter
+     * controls sleep duration between iterations through intermediate reducer maps.
+     * <p>
+     * Defaults to {@code 0}.
+     */
+    SHUFFLE_JOB_THROTTLE;
 
     /** */
     private final String ptyName;

http://git-wip-us.apache.org/repos/asf/ignite/blob/27753f9d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index b940c72..8c731c0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.thread.IgniteThread;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
 
@@ -108,6 +109,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** */
     private final IgniteLogger log;
 
+    /** */
+    private final long throttle;
+
     /**
      * @param locReduceAddr Local reducer address.
      * @param log Logger.
@@ -136,6 +140,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
 
         maps = new AtomicReferenceArray<>(totalReducerCnt);
         msgs = new HadoopShuffleMessage[totalReducerCnt];
+
+        throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0);
     }
 
     /**
@@ -175,7 +181,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
                 @Override protected void body() throws InterruptedException {
                     try {
                         while (!isCancelled()) {
-                            Thread.sleep(5);
+                            if (throttle > 0)
+                                Thread.sleep(throttle);
 
                             collectUpdatesAndSend(false);
                         }