You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/06 08:40:45 UTC

[48/50] ignite git commit: IGNITE-4281: Hadoop: decoupled remote and local maps to simplify further optimizations. This closes #1264. This closes #1315.

IGNITE-4281: Hadoop: decoupled remote and local maps to simplify further optimizations. This closes #1264. This closes #1315.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e9e3714
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e9e3714
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e9e3714

Branch: refs/heads/ignite-comm-balance-master
Commit: 9e9e371468baef4e2bc7b9fc4c3089e6d073c014
Parents: acbb8ae
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 17:09:28 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 17:09:28 2016 +0300

----------------------------------------------------------------------
 .../hadoop/shuffle/HadoopShuffleJob.java        | 85 ++++++++++++++------
 1 file changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9e9e3714/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index aca5fdf..3afb55a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -17,9 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop.shuffle;
 
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -56,8 +54,8 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.thread.IgniteThread;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
 
@@ -77,20 +75,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** */
     private final boolean needPartitioner;
 
-    /** Collection of task contexts for each reduce task. */
-    private final Map<Integer, LocalTaskContextProxy> reducersCtx = new HashMap<>();
+    /** Task contexts for each reduce task. */
+    private final AtomicReferenceArray<LocalTaskContextProxy> locReducersCtx;
 
     /** Reducers addresses. */
     private T[] reduceAddrs;
 
+    /** Total reducer count. */
+    private final int totalReducerCnt;
+
     /** Local reducers address. */
     private final T locReduceAddr;
 
     /** */
     private final HadoopShuffleMessage[] msgs;
 
-    /** */
-    private final AtomicReferenceArray<HadoopMultimap> maps;
+    /** Maps for local reducers. */
+    private final AtomicReferenceArray<HadoopMultimap> locMaps;
+
+    /** Maps for remote reducers. */
+    private final AtomicReferenceArray<HadoopMultimap> rmtMaps;
 
     /** */
     private volatile IgniteInClosure2X<T, HadoopMessage> io;
@@ -129,23 +133,27 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
         int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
         this.locReduceAddr = locReduceAddr;
+        this.totalReducerCnt = totalReducerCnt;
         this.job = job;
         this.mem = mem;
         this.log = log.getLogger(HadoopShuffleJob.class);
 
         msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
 
+        locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt);
+
         if (!F.isEmpty(locReducers)) {
             for (int rdc : locReducers) {
                 HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
 
-                reducersCtx.put(rdc, new LocalTaskContextProxy(taskInfo));
+                locReducersCtx.set(rdc, new LocalTaskContextProxy(taskInfo));
             }
         }
 
         needPartitioner = totalReducerCnt > 1;
 
-        maps = new AtomicReferenceArray<>(totalReducerCnt);
+        locMaps = new AtomicReferenceArray<>(totalReducerCnt);
+        rmtMaps = new AtomicReferenceArray<>(totalReducerCnt);
         msgs = new HadoopShuffleMessage[totalReducerCnt];
 
         throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0);
@@ -237,13 +245,13 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         assert msg.buffer() != null;
         assert msg.offset() > 0;
 
-        HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()).get();
+        HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get();
 
         HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
 
         perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
 
-        HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
+        HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer());
 
         // Add data from message to the map.
         try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
@@ -320,10 +328,10 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      * Sends map updates to remote reducers.
      */
     private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
-        for (int i = 0; i < maps.length(); i++) {
-            HadoopMultimap map = maps.get(i);
+        for (int i = 0; i < rmtMaps.length(); i++) {
+            HadoopMultimap map = rmtMaps.get(i);
 
-            if (map == null || locReduceAddr.equals(reduceAddrs[i]))
+            if (map == null)
                 continue; // Skip empty map and local node.
 
             if (msgs[i] == null)
@@ -448,7 +456,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
             }
         }
 
-        close(maps);
+        close(locMaps);
+        close(rmtMaps);
     }
 
     /**
@@ -473,7 +482,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
 
         flushed = true;
 
-        if (maps.length() == 0)
+        if (totalReducerCnt == 0)
             return new GridFinishedFuture<>();
 
         U.await(ioInitLatch);
@@ -544,7 +553,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
             case REDUCE:
                 int reducer = taskCtx.taskInfo().taskNumber();
 
-                HadoopMultimap m = maps.get(reducer);
+                HadoopMultimap m = locMaps.get(reducer);
 
                 if (m != null)
                     return m.input(taskCtx);
@@ -573,11 +582,24 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     }
 
     /**
+     * Check if certain partition (reducer) is local.
+     *
+     * @param part Partition.
+     * @return {@code True} if local.
+     */
+    private boolean isLocalPartition(int part) {
+        return locReducersCtx.get(part) != null;
+    }
+
+    /**
      * Partitioned output.
      */
     private class PartitionedOutput implements HadoopTaskOutput {
         /** */
-        private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()];
+        private final HadoopTaskOutput[] locAdders = new HadoopTaskOutput[locMaps.length()];
+
+        /** */
+        private final HadoopTaskOutput[] rmtAdders = new HadoopTaskOutput[rmtMaps.length()];
 
         /** */
         private HadoopPartitioner partitioner;
@@ -601,23 +623,38 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
             int part = 0;
 
             if (partitioner != null) {
-                part = partitioner.partition(key, val, adders.length);
+                part = partitioner.partition(key, val, totalReducerCnt);
 
-                if (part < 0 || part >= adders.length)
+                if (part < 0 || part >= totalReducerCnt)
                     throw new IgniteCheckedException("Invalid partition: " + part);
             }
 
-            HadoopTaskOutput out = adders[part];
+            HadoopTaskOutput out;
 
-            if (out == null)
-                adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
+            if (isLocalPartition(part)) {
+                out = locAdders[part];
+
+                if (out == null)
+                    locAdders[part] = out = getOrCreateMap(locMaps, part).startAdding(taskCtx);
+            }
+            else {
+                out = rmtAdders[part];
+
+                if (out == null)
+                    rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx);
+            }
 
             out.write(key, val);
         }
 
         /** {@inheritDoc} */
         @Override public void close() throws IgniteCheckedException {
-            for (HadoopTaskOutput adder : adders) {
+            for (HadoopTaskOutput adder : locAdders) {
+                if (adder != null)
+                    adder.close();
+            }
+
+            for (HadoopTaskOutput adder : rmtAdders) {
                 if (adder != null)
                     adder.close();
             }