You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:16:01 UTC
[21/50] [abbrv] ignite git commit: IGNITE-4281: Hadoop: decoupled
remote and local maps to simplify further optimizations. This closes #1264.
This closes #1315.
IGNITE-4281: Hadoop: decoupled remote and local maps to simplify further optimizations. This closes #1264. This closes #1315.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/be12a7ea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/be12a7ea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/be12a7ea
Branch: refs/heads/master
Commit: be12a7ea242dedba932c15dce005540c34711e77
Parents: 04cff9b
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 17:09:28 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Dec 15 13:46:01 2016 +0300
----------------------------------------------------------------------
.../hadoop/shuffle/HadoopShuffleJob.java | 85 ++++++++++++++------
1 file changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/be12a7ea/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index aca5fdf..3afb55a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -17,9 +17,7 @@
package org.apache.ignite.internal.processors.hadoop.shuffle;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -56,8 +54,8 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -77,20 +75,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** */
private final boolean needPartitioner;
- /** Collection of task contexts for each reduce task. */
- private final Map<Integer, LocalTaskContextProxy> reducersCtx = new HashMap<>();
+ /** Task contexts for each reduce task. */
+ private final AtomicReferenceArray<LocalTaskContextProxy> locReducersCtx;
/** Reducers addresses. */
private T[] reduceAddrs;
+ /** Total reducer count. */
+ private final int totalReducerCnt;
+
/** Local reducers address. */
private final T locReduceAddr;
/** */
private final HadoopShuffleMessage[] msgs;
- /** */
- private final AtomicReferenceArray<HadoopMultimap> maps;
+ /** Maps for local reducers. */
+ private final AtomicReferenceArray<HadoopMultimap> locMaps;
+
+ /** Maps for remote reducers. */
+ private final AtomicReferenceArray<HadoopMultimap> rmtMaps;
/** */
private volatile IgniteInClosure2X<T, HadoopMessage> io;
@@ -129,23 +133,27 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
this.locReduceAddr = locReduceAddr;
+ this.totalReducerCnt = totalReducerCnt;
this.job = job;
this.mem = mem;
this.log = log.getLogger(HadoopShuffleJob.class);
msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
+ locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt);
+
if (!F.isEmpty(locReducers)) {
for (int rdc : locReducers) {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
- reducersCtx.put(rdc, new LocalTaskContextProxy(taskInfo));
+ locReducersCtx.set(rdc, new LocalTaskContextProxy(taskInfo));
}
}
needPartitioner = totalReducerCnt > 1;
- maps = new AtomicReferenceArray<>(totalReducerCnt);
+ locMaps = new AtomicReferenceArray<>(totalReducerCnt);
+ rmtMaps = new AtomicReferenceArray<>(totalReducerCnt);
msgs = new HadoopShuffleMessage[totalReducerCnt];
throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0);
@@ -237,13 +245,13 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
assert msg.buffer() != null;
assert msg.offset() > 0;
- HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()).get();
+ HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get();
HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
- HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
+ HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer());
// Add data from message to the map.
try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
@@ -320,10 +328,10 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* Sends map updates to remote reducers.
*/
private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
- for (int i = 0; i < maps.length(); i++) {
- HadoopMultimap map = maps.get(i);
+ for (int i = 0; i < rmtMaps.length(); i++) {
+ HadoopMultimap map = rmtMaps.get(i);
- if (map == null || locReduceAddr.equals(reduceAddrs[i]))
+ if (map == null)
continue; // Skip empty map and local node.
if (msgs[i] == null)
@@ -448,7 +456,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
}
- close(maps);
+ close(locMaps);
+ close(rmtMaps);
}
/**
@@ -473,7 +482,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
flushed = true;
- if (maps.length() == 0)
+ if (totalReducerCnt == 0)
return new GridFinishedFuture<>();
U.await(ioInitLatch);
@@ -544,7 +553,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
case REDUCE:
int reducer = taskCtx.taskInfo().taskNumber();
- HadoopMultimap m = maps.get(reducer);
+ HadoopMultimap m = locMaps.get(reducer);
if (m != null)
return m.input(taskCtx);
@@ -573,11 +582,24 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
/**
+ * Check if certain partition (reducer) is local.
+ *
+ * @param part Partition.
+ * @return {@code True} if local.
+ */
+ private boolean isLocalPartition(int part) {
+ return locReducersCtx.get(part) != null;
+ }
+
+ /**
* Partitioned output.
*/
private class PartitionedOutput implements HadoopTaskOutput {
/** */
- private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()];
+ private final HadoopTaskOutput[] locAdders = new HadoopTaskOutput[locMaps.length()];
+
+ /** */
+ private final HadoopTaskOutput[] rmtAdders = new HadoopTaskOutput[rmtMaps.length()];
/** */
private HadoopPartitioner partitioner;
@@ -601,23 +623,38 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
int part = 0;
if (partitioner != null) {
- part = partitioner.partition(key, val, adders.length);
+ part = partitioner.partition(key, val, totalReducerCnt);
- if (part < 0 || part >= adders.length)
+ if (part < 0 || part >= totalReducerCnt)
throw new IgniteCheckedException("Invalid partition: " + part);
}
- HadoopTaskOutput out = adders[part];
+ HadoopTaskOutput out;
- if (out == null)
- adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
+ if (isLocalPartition(part)) {
+ out = locAdders[part];
+
+ if (out == null)
+ locAdders[part] = out = getOrCreateMap(locMaps, part).startAdding(taskCtx);
+ }
+ else {
+ out = rmtAdders[part];
+
+ if (out == null)
+ rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx);
+ }
out.write(key, val);
}
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
- for (HadoopTaskOutput adder : adders) {
+ for (HadoopTaskOutput adder : locAdders) {
+ if (adder != null)
+ adder.close();
+ }
+
+ for (HadoopTaskOutput adder : rmtAdders) {
if (adder != null)
adder.close();
}