You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/05 13:28:15 UTC

ignite git commit: IGNITE-4355: Hadoop: Implemented parallel task context initialization during shuffle. This closes #1310. This closes #1313.

Repository: ignite
Updated Branches:
  refs/heads/master bc33d19ed -> acbb8aea8


IGNITE-4355: Hadoop: Implemented parallel task context initialization during shuffle. This closes #1310. This closes #1313.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acbb8aea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acbb8aea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acbb8aea

Branch: refs/heads/master
Commit: acbb8aea8ddfab89947526241a6f47853efc5096
Parents: bc33d19
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 16:28:05 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 16:28:05 2016 +0300

----------------------------------------------------------------------
 .../hadoop/shuffle/HadoopShuffle.java           | 23 ++++++---
 .../hadoop/shuffle/HadoopShuffleJob.java        | 53 ++++++++++++++++++--
 2 files changed, 66 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/acbb8aea/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index a69e779..4450bf2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -53,6 +53,9 @@ public class HadoopShuffle extends HadoopComponent {
     /** */
     protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
 
+    /** Mutex for iternal synchronization. */
+    private final Object mux = new Object();
+
     /** {@inheritDoc} */
     @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
         super.start(ctx);
@@ -141,17 +144,23 @@ public class HadoopShuffle extends HadoopComponent {
         HadoopShuffleJob<UUID> res = jobs.get(jobId);
 
         if (res == null) {
-            res = newJob(jobId);
+            synchronized (mux) {
+                res = jobs.get(jobId);
+
+                if (res == null) {
+                    res = newJob(jobId);
 
-            HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
+                    HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
 
-            if (old != null) {
-                res.close();
+                    if (old != null) {
+                        res.close();
 
-                res = old;
+                        res = old;
+                    }
+                    else if (res.reducersInitialized())
+                        startSending(res);
+                }
             }
-            else if (res.reducersInitialized())
-                startSending(res);
         }
 
         return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/acbb8aea/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 9392b2c..aca5fdf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -78,7 +78,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     private final boolean needPartitioner;
 
     /** Collection of task contexts for each reduce task. */
-    private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>();
+    private final Map<Integer, LocalTaskContextProxy> reducersCtx = new HashMap<>();
 
     /** Reducers addresses. */
     private T[] reduceAddrs;
@@ -139,7 +139,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
             for (int rdc : locReducers) {
                 HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
 
-                reducersCtx.put(rdc, job.getTaskContext(taskInfo));
+                reducersCtx.put(rdc, new LocalTaskContextProxy(taskInfo));
             }
         }
 
@@ -237,7 +237,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         assert msg.buffer() != null;
         assert msg.offset() > 0;
 
-        HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
+        HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()).get();
 
         HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
 
@@ -623,4 +623,51 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
             }
         }
     }
+
+    /**
+     * Local task context proxy with delayed initialization.
+     */
+    private class LocalTaskContextProxy {
+        /** Mutex for synchronization. */
+        private final Object mux = new Object();
+
+        /** Task info. */
+        private final HadoopTaskInfo taskInfo;
+
+        /** Task context. */
+        private volatile HadoopTaskContext ctx;
+
+        /**
+         * Constructor.
+         *
+         * @param taskInfo Task info.
+         */
+        public LocalTaskContextProxy(HadoopTaskInfo taskInfo) {
+            this.taskInfo = taskInfo;
+        }
+
+        /**
+         * Get task context.
+         *
+         * @return Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        public HadoopTaskContext get() throws IgniteCheckedException {
+            HadoopTaskContext ctx0 = ctx;
+
+            if (ctx0 == null) {
+                synchronized (mux) {
+                    ctx0 = ctx;
+
+                    if (ctx0 == null) {
+                        ctx0 = job.getTaskContext(taskInfo);
+
+                        ctx = ctx0;
+                    }
+                }
+            }
+
+            return ctx0;
+        }
+    }
 }
\ No newline at end of file