You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/06 08:40:40 UTC
[43/50] ignite git commit: IGNITE-4274: Hadoop: added new property to
control shuffle message size.
IGNITE-4274: Hadoop: added new property to control shuffle message size.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e8c35b4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e8c35b4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e8c35b4
Branch: refs/heads/ignite-comm-balance-master
Commit: 6e8c35b42adf36c2600d6ea423d468e5cc11add4
Parents: 214197c
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 15:28:54 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 15:28:54 2016 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopJobProperty.java | 37 ++++++++++++--------
.../hadoop/shuffle/HadoopShuffleJob.java | 12 +++++--
2 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e8c35b4/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index e7bf565..e713caa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -28,40 +28,40 @@ public enum HadoopJobProperty {
* <p>
* Setting it right allows to avoid rehashing.
*/
- COMBINER_HASHMAP_SIZE,
+ COMBINER_HASHMAP_SIZE("ignite.combiner.hashmap.size"),
/**
* Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer.
* <p>
* Setting it right allows to avoid rehashing.
*/
- PARTITION_HASHMAP_SIZE,
+ PARTITION_HASHMAP_SIZE("ignite.partition.hashmap.size"),
/**
* Specifies number of concurrently running mappers for external execution mode.
* <p>
* If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
*/
- EXTERNAL_CONCURRENT_MAPPERS,
+ EXTERNAL_CONCURRENT_MAPPERS("ignite.external.concurrent.mappers"),
/**
* Specifies number of concurrently running reducers for external execution mode.
* <p>
* If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
*/
- EXTERNAL_CONCURRENT_REDUCERS,
+ EXTERNAL_CONCURRENT_REDUCERS("ignite.external.concurrent.reducers"),
/**
* Delay in milliseconds after which Ignite server will reply job status.
*/
- JOB_STATUS_POLL_DELAY,
+ JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"),
/**
* Size in bytes of single memory page which will be allocated for data structures in shuffle.
* <p>
* By default is {@code 32 * 1024}.
*/
- SHUFFLE_OFFHEAP_PAGE_SIZE,
+ SHUFFLE_OFFHEAP_PAGE_SIZE("ignite.shuffle.offheap.page.size"),
/**
* If set to {@code true} then input for combiner will not be sorted by key.
@@ -71,7 +71,7 @@ public enum HadoopJobProperty {
* <p>
* By default is {@code false}.
*/
- SHUFFLE_COMBINER_NO_SORTING,
+ SHUFFLE_COMBINER_NO_SORTING("ignite.shuffle.combiner.no.sorting"),
/**
* If set to {@code true} then input for reducer will not be sorted by key.
@@ -81,7 +81,14 @@ public enum HadoopJobProperty {
* <p>
* By default is {@code false}.
*/
- SHUFFLE_REDUCER_NO_SORTING,
+ SHUFFLE_REDUCER_NO_SORTING("ignite.shuffle.reducer.no.sorting"),
+
+ /**
+ * Defines approximate size in bytes of shuffle message which will be passed over wire from mapper to reducer.
+ * <p>
+ * Defaults to 128Kb.
+ */
+ SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"),
/**
* Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter
@@ -89,23 +96,25 @@ public enum HadoopJobProperty {
* <p>
* Defaults to {@code 0}.
*/
- SHUFFLE_JOB_THROTTLE;
+ SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle");
- /** */
- private final String ptyName;
+ /** Property name. */
+ private final String propName;
/**
+ * Constrcutor.
*
+ * @param propName Property name.
*/
- HadoopJobProperty() {
- ptyName = "ignite." + name().toLowerCase().replace('_', '.');
+ HadoopJobProperty(String propName) {
+ this.propName = propName;
}
/**
* @return Property name.
*/
public String propertyName() {
- return ptyName;
+ return propName;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e8c35b4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 8c731c0..e5af8f1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -55,6 +55,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -64,7 +65,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get
*/
public class HadoopShuffleJob<T> implements AutoCloseable {
/** */
- private static final int MSG_BUF_SIZE = 128 * 1024;
+ private static final int DFLT_SHUFFLE_MSG_SIZE = 128 * 1024;
/** */
private final HadoopJob job;
@@ -109,6 +110,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** */
private final IgniteLogger log;
+ /** Message size. */
+ private final int msgSize;
+
/** */
private final long throttle;
@@ -128,6 +132,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
this.mem = mem;
this.log = log.getLogger(HadoopShuffleJob.class);
+ msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
+
if (!F.isEmpty(locReducers)) {
for (int rdc : locReducers) {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
@@ -320,7 +326,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
continue; // Skip empty map and local node.
if (msgs[i] == null)
- msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
+ msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize);
final int idx = i;
@@ -425,7 +431,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
});
msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
- Math.max(MSG_BUF_SIZE, newBufMinSize));
+ Math.max(msgSize, newBufMinSize));
}
/** {@inheritDoc} */