You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/06 08:40:40 UTC

[43/50] ignite git commit: IGNITE-4274: Hadoop: added new property to control shuffle message size.

IGNITE-4274: Hadoop: added new property to control shuffle message size.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e8c35b4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e8c35b4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e8c35b4

Branch: refs/heads/ignite-comm-balance-master
Commit: 6e8c35b42adf36c2600d6ea423d468e5cc11add4
Parents: 214197c
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 15:28:54 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 15:28:54 2016 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopJobProperty.java    | 37 ++++++++++++--------
 .../hadoop/shuffle/HadoopShuffleJob.java        | 12 +++++--
 2 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6e8c35b4/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index e7bf565..e713caa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -28,40 +28,40 @@ public enum HadoopJobProperty {
      * <p>
      * Setting it right allows to avoid rehashing.
      */
-    COMBINER_HASHMAP_SIZE,
+    COMBINER_HASHMAP_SIZE("ignite.combiner.hashmap.size"),
 
     /**
      * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer.
      * <p>
      * Setting it right allows to avoid rehashing.
      */
-    PARTITION_HASHMAP_SIZE,
+    PARTITION_HASHMAP_SIZE("ignite.partition.hashmap.size"),
 
     /**
      * Specifies number of concurrently running mappers for external execution mode.
      * <p>
      * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
      */
-    EXTERNAL_CONCURRENT_MAPPERS,
+    EXTERNAL_CONCURRENT_MAPPERS("ignite.external.concurrent.mappers"),
 
     /**
      * Specifies number of concurrently running reducers for external execution mode.
      * <p>
      * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
      */
-    EXTERNAL_CONCURRENT_REDUCERS,
+    EXTERNAL_CONCURRENT_REDUCERS("ignite.external.concurrent.reducers"),
 
     /**
      * Delay in milliseconds after which Ignite server will reply job status.
      */
-    JOB_STATUS_POLL_DELAY,
+    JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"),
 
     /**
      * Size in bytes of single memory page which will be allocated for data structures in shuffle.
      * <p>
      * By default is {@code 32 * 1024}.
      */
-    SHUFFLE_OFFHEAP_PAGE_SIZE,
+    SHUFFLE_OFFHEAP_PAGE_SIZE("ignite.shuffle.offheap.page.size"),
 
     /**
      * If set to {@code true} then input for combiner will not be sorted by key.
@@ -71,7 +71,7 @@ public enum HadoopJobProperty {
      * <p>
      * By default is {@code false}.
      */
-    SHUFFLE_COMBINER_NO_SORTING,
+    SHUFFLE_COMBINER_NO_SORTING("ignite.shuffle.combiner.no.sorting"),
 
     /**
      * If set to {@code true} then input for reducer will not be sorted by key.
@@ -81,7 +81,14 @@ public enum HadoopJobProperty {
      * <p>
      * By default is {@code false}.
      */
-    SHUFFLE_REDUCER_NO_SORTING,
+    SHUFFLE_REDUCER_NO_SORTING("ignite.shuffle.reducer.no.sorting"),
+
+    /**
+     * Defines approximate size in bytes of shuffle message which will be passed over wire from mapper to reducer.
+     * <p>
+     * Defaults to 128Kb.
+     */
+    SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"),
 
     /**
      * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter
@@ -89,23 +96,25 @@ public enum HadoopJobProperty {
      * <p>
      * Defaults to {@code 0}.
      */
-    SHUFFLE_JOB_THROTTLE;
+    SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle");
 
-    /** */
-    private final String ptyName;
+    /** Property name. */
+    private final String propName;
 
     /**
+     * Constrcutor.
      *
+     * @param propName Property name.
      */
-    HadoopJobProperty() {
-        ptyName = "ignite." + name().toLowerCase().replace('_', '.');
+    HadoopJobProperty(String propName) {
+        this.propName = propName;
     }
 
     /**
      * @return Property name.
      */
     public String propertyName() {
-        return ptyName;
+        return propName;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6e8c35b4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 8c731c0..e5af8f1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.thread.IgniteThread;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -64,7 +65,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get
  */
 public class HadoopShuffleJob<T> implements AutoCloseable {
     /** */
-    private static final int MSG_BUF_SIZE = 128 * 1024;
+    private static final int DFLT_SHUFFLE_MSG_SIZE = 128 * 1024;
 
     /** */
     private final HadoopJob job;
@@ -109,6 +110,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** */
     private final IgniteLogger log;
 
+    /** Message size. */
+    private final int msgSize;
+
     /** */
     private final long throttle;
 
@@ -128,6 +132,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         this.mem = mem;
         this.log = log.getLogger(HadoopShuffleJob.class);
 
+        msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
+
         if (!F.isEmpty(locReducers)) {
             for (int rdc : locReducers) {
                 HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
@@ -320,7 +326,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
                 continue; // Skip empty map and local node.
 
             if (msgs[i] == null)
-                msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
+                msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize);
 
             final int idx = i;
 
@@ -425,7 +431,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         });
 
         msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
-            Math.max(MSG_BUF_SIZE, newBufMinSize));
+            Math.max(msgSize, newBufMinSize));
     }
 
     /** {@inheritDoc} */