You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/05 13:28:15 UTC
ignite git commit: IGNITE-4355: Hadoop: Implemented parallel task
context initialization during shuffle. This closes #1310. This closes #1313.
Repository: ignite
Updated Branches:
refs/heads/master bc33d19ed -> acbb8aea8
IGNITE-4355: Hadoop: Implemented parallel task context initialization during shuffle. This closes #1310. This closes #1313.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/acbb8aea
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/acbb8aea
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/acbb8aea
Branch: refs/heads/master
Commit: acbb8aea8ddfab89947526241a6f47853efc5096
Parents: bc33d19
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 16:28:05 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 16:28:05 2016 +0300
----------------------------------------------------------------------
.../hadoop/shuffle/HadoopShuffle.java | 23 ++++++---
.../hadoop/shuffle/HadoopShuffleJob.java | 53 ++++++++++++++++++--
2 files changed, 66 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/acbb8aea/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index a69e779..4450bf2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -53,6 +53,9 @@ public class HadoopShuffle extends HadoopComponent {
/** */
protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
+ /** Mutex for iternal synchronization. */
+ private final Object mux = new Object();
+
/** {@inheritDoc} */
@Override public void start(HadoopContext ctx) throws IgniteCheckedException {
super.start(ctx);
@@ -141,17 +144,23 @@ public class HadoopShuffle extends HadoopComponent {
HadoopShuffleJob<UUID> res = jobs.get(jobId);
if (res == null) {
- res = newJob(jobId);
+ synchronized (mux) {
+ res = jobs.get(jobId);
+
+ if (res == null) {
+ res = newJob(jobId);
- HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
+ HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
- if (old != null) {
- res.close();
+ if (old != null) {
+ res.close();
- res = old;
+ res = old;
+ }
+ else if (res.reducersInitialized())
+ startSending(res);
+ }
}
- else if (res.reducersInitialized())
- startSending(res);
}
return res;
http://git-wip-us.apache.org/repos/asf/ignite/blob/acbb8aea/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 9392b2c..aca5fdf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -78,7 +78,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
private final boolean needPartitioner;
/** Collection of task contexts for each reduce task. */
- private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>();
+ private final Map<Integer, LocalTaskContextProxy> reducersCtx = new HashMap<>();
/** Reducers addresses. */
private T[] reduceAddrs;
@@ -139,7 +139,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
for (int rdc : locReducers) {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
- reducersCtx.put(rdc, job.getTaskContext(taskInfo));
+ reducersCtx.put(rdc, new LocalTaskContextProxy(taskInfo));
}
}
@@ -237,7 +237,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
assert msg.buffer() != null;
assert msg.offset() > 0;
- HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
+ HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()).get();
HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
@@ -623,4 +623,51 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
}
}
+
+ /**
+ * Local task context proxy with delayed initialization.
+ */
+ private class LocalTaskContextProxy {
+ /** Mutex for synchronization. */
+ private final Object mux = new Object();
+
+ /** Task info. */
+ private final HadoopTaskInfo taskInfo;
+
+ /** Task context. */
+ private volatile HadoopTaskContext ctx;
+
+ /**
+ * Constructor.
+ *
+ * @param taskInfo Task info.
+ */
+ public LocalTaskContextProxy(HadoopTaskInfo taskInfo) {
+ this.taskInfo = taskInfo;
+ }
+
+ /**
+ * Get task context.
+ *
+ * @return Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopTaskContext get() throws IgniteCheckedException {
+ HadoopTaskContext ctx0 = ctx;
+
+ if (ctx0 == null) {
+ synchronized (mux) {
+ ctx0 = ctx;
+
+ if (ctx0 == null) {
+ ctx0 = job.getTaskContext(taskInfo);
+
+ ctx = ctx0;
+ }
+ }
+ }
+
+ return ctx0;
+ }
+ }
}
\ No newline at end of file