You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/06 10:59:07 UTC
[3/6] ignite git commit: IGNITE-4301: Hadoop: Optimized shuffle so
that only one ack is needed when running in embedded mode. This closes #1319.
IGNITE-4301: Hadoop: Optimized shuffle so that only one ack is needed when running in embedded mode. This closes #1319.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79a16007
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79a16007
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79a16007
Branch: refs/heads/ignite-comm-balance-master
Commit: 79a1600713cbe7bba45017caa735b048a17362c1
Parents: 2582c5c
Author: devozerov <vo...@gridgain.com>
Authored: Tue Dec 6 13:23:05 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Dec 6 13:23:05 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 12 +
.../shuffle/HadoopShuffleFinishRequest.java | 172 ++++++++++++++
.../shuffle/HadoopShuffleFinishResponse.java | 142 ++++++++++++
.../hadoop/shuffle/HadoopShuffle.java | 45 ++--
.../hadoop/shuffle/HadoopShuffleJob.java | 231 +++++++++++++++++--
.../hadoop/shuffle/HadoopShuffleLocalState.java | 67 ++++++
.../shuffle/HadoopShuffleRemoteState.java | 64 +++++
.../child/HadoopChildProcessRunner.java | 6 +-
8 files changed, 686 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 86742e8..4ffb220 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -119,6 +119,8 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
@@ -168,6 +170,16 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -41:
+ msg = new HadoopShuffleFinishResponse();
+
+ break;
+
+ case -40:
+ msg = new HadoopShuffleFinishRequest();
+
+ break;
+
case -39:
msg = new HadoopJobId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java
new file mode 100644
index 0000000..f568c2e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Shuffle finish request.
+ */
+public class HadoopShuffleFinishRequest implements Message, HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /** Total message count. */
+ private long msgCnt;
+
+ /**
+ * Default constructor.
+ */
+ public HadoopShuffleFinishRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param jobId Job.
+ * @param msgCnt Message count.
+ */
+ public HadoopShuffleFinishRequest(HadoopJobId jobId, long msgCnt) {
+ this.jobId = jobId;
+ this.msgCnt = msgCnt;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Message count.
+ */
+ public long messageCount() {
+ return msgCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMessage("jobId", jobId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("msgCnt", msgCnt))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ jobId = reader.readMessage("jobId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ msgCnt = reader.readLong("msgCnt");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(HadoopShuffleFinishRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -40;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+
+ out.writeLong(msgCnt);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+
+ jobId.readExternal(in);
+
+ msgCnt = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopShuffleFinishRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java
new file mode 100644
index 0000000..4b7c93b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Shuffle finish response.
+ */
+public class HadoopShuffleFinishResponse implements Message, HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Job ID. */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /**
+ * Default constructor.
+ */
+ public HadoopShuffleFinishResponse() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param jobId Job.
+ */
+ public HadoopShuffleFinishResponse(HadoopJobId jobId) {
+ this.jobId = jobId;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMessage("jobId", jobId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ jobId = reader.readMessage("jobId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(HadoopShuffleFinishResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -41;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+
+ jobId.readExternal(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopShuffleFinishResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 4450bf2..82bbd32 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -103,7 +103,7 @@ public class HadoopShuffle extends HadoopComponent {
HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
- ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+ ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()), true);
UUID[] rdcAddrs = new UUID[plan.reducers()];
@@ -189,37 +189,34 @@ public class HadoopShuffle extends HadoopComponent {
* @return {@code True}.
*/
public boolean onMessageReceived(UUID src, HadoopMessage msg) {
- if (msg instanceof HadoopShuffleMessage) {
- HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+ try {
+ if (msg instanceof HadoopShuffleMessage) {
+ HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
- try {
- job(m.jobId()).onShuffleMessage(m);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Message handling failed.", e);
+ job(m.jobId()).onShuffleMessage(src, m);
}
+ else if (msg instanceof HadoopShuffleAck) {
+ HadoopShuffleAck m = (HadoopShuffleAck)msg;
- try {
- // Reply with ack.
- send0(src, new HadoopShuffleAck(m.id(), m.jobId()));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
+ job(m.jobId()).onShuffleAck(m);
}
- }
- else if (msg instanceof HadoopShuffleAck) {
- HadoopShuffleAck m = (HadoopShuffleAck)msg;
+ else if (msg instanceof HadoopShuffleFinishRequest) {
+ HadoopShuffleFinishRequest m = (HadoopShuffleFinishRequest)msg;
- try {
- job(m.jobId()).onShuffleAck(m);
+ job(m.jobId()).onShuffleFinishRequest(src, m);
}
- catch (IgniteCheckedException e) {
- U.error(log, "Message handling failed.", e);
+ else if (msg instanceof HadoopShuffleFinishResponse) {
+ HadoopShuffleFinishResponse m = (HadoopShuffleFinishResponse)msg;
+
+ job(m.jobId()).onShuffleFinishResponse(src);
}
+ else
+ throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
+ ", msg=" + msg + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Message handling failed.", e);
}
- else
- throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
- ", msg=" + msg + ']');
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 3afb55a..0a3a0ae 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.hadoop.shuffle;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -27,6 +29,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -118,9 +121,21 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** Message size. */
private final int msgSize;
+ /** Local shuffle states. */
+ private volatile HashMap<T, HadoopShuffleLocalState> locShuffleStates = new HashMap<>();
+
+ /** Remote shuffle states. */
+ private volatile HashMap<T, HadoopShuffleRemoteState> rmtShuffleStates = new HashMap<>();
+
+ /** Mutex for internal synchronization. */
+ private final Object mux = new Object();
+
/** */
private final long throttle;
+ /** Embedded mode flag. */
+ private final boolean embedded;
+
/**
* @param locReduceAddr Local reducer address.
* @param log Logger.
@@ -128,15 +143,17 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @param mem Memory.
* @param totalReducerCnt Amount of reducers in the Job.
* @param locReducers Reducers will work on current node.
+ * @param embedded Whether shuffle is running in embedded mode.
* @throws IgniteCheckedException If error.
*/
public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
- int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
+ int totalReducerCnt, int[] locReducers, boolean embedded) throws IgniteCheckedException {
this.locReduceAddr = locReduceAddr;
this.totalReducerCnt = totalReducerCnt;
this.job = job;
this.mem = mem;
this.log = log.getLogger(HadoopShuffleJob.class);
+ this.embedded = embedded;
msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
@@ -238,10 +255,11 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
/**
+ * @param src Source.
* @param msg Message.
* @throws IgniteCheckedException Exception.
*/
- public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException {
+ public void onShuffleMessage(T src, HadoopShuffleMessage msg) throws IgniteCheckedException {
assert msg.buffer() != null;
assert msg.offset() > 0;
@@ -276,6 +294,15 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
});
}
+
+ if (embedded) {
+ // No immediate response.
+ if (localShuffleState(src).onShuffleMessage())
+ sendFinishResponse(src, msg.jobId());
+ }
+ else
+ // Response for every message.
+ io.apply(src, new HadoopShuffleAck(msg.id(), msg.jobId()));
}
/**
@@ -292,6 +319,121 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
}
/**
+ * Process shuffle finish request.
+ *
+ * @param src Source.
+ * @param msg Shuffle finish message.
+ */
+ public void onShuffleFinishRequest(T src, HadoopShuffleFinishRequest msg) {
+ if (log.isDebugEnabled())
+ log.debug("Received shuffle finish request [jobId=" + job.id() + ", src=" + src + ", req=" + msg + ']');
+
+ HadoopShuffleLocalState state = localShuffleState(src);
+
+ if (state.onShuffleFinishMessage(msg.messageCount()))
+ sendFinishResponse(src, msg.jobId());
+ }
+
+ /**
+ * Process shuffle finish response.
+ *
+ * @param src Source.
+ */
+ public void onShuffleFinishResponse(T src) {
+ if (log.isDebugEnabled())
+ log.debug("Received shuffle finish response [jobId=" + job.id() + ", src=" + src + ']');
+
+ remoteShuffleState(src).onShuffleFinishResponse();
+ }
+
+ /**
+ * Send finish response.
+ *
+ * @param dest Destination.
+ * @param jobId Job ID.
+ */
+ @SuppressWarnings("unchecked")
+ private void sendFinishResponse(T dest, HadoopJobId jobId) {
+ if (log.isDebugEnabled())
+ log.debug("Sent shuffle finish response [jobId=" + jobId + ", dest=" + dest + ']');
+
+ HadoopShuffleFinishResponse msg = new HadoopShuffleFinishResponse(jobId);
+
+ io.apply(dest, msg);
+ }
+
+ /**
+ * Get local shuffle state for node.
+ *
+ * @param src Source
+ * @return Local shuffle state.
+ */
+ private HadoopShuffleLocalState localShuffleState(T src) {
+ HashMap<T, HadoopShuffleLocalState> states = locShuffleStates;
+
+ HadoopShuffleLocalState res = states.get(src);
+
+ if (res == null) {
+ synchronized (mux) {
+ res = locShuffleStates.get(src);
+
+ if (res == null) {
+ res = new HadoopShuffleLocalState();
+
+ states = new HashMap<>(locShuffleStates);
+
+ states.put(src, res);
+
+ locShuffleStates = states;
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Get remote shuffle state for node.
+ *
+ * @param src Source.
+ * @return Remote shuffle state.
+ */
+ private HadoopShuffleRemoteState remoteShuffleState(T src) {
+ HashMap<T, HadoopShuffleRemoteState> states = rmtShuffleStates;
+
+ HadoopShuffleRemoteState res = states.get(src);
+
+ if (res == null) {
+ synchronized (mux) {
+ res = rmtShuffleStates.get(src);
+
+ if (res == null) {
+ res = new HadoopShuffleRemoteState();
+
+ states = new HashMap<>(rmtShuffleStates);
+
+ states.put(src, res);
+
+ rmtShuffleStates = states;
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * Get all remote shuffle states.
+ *
+ * @return Remote shuffle states.
+ */
+ private HashMap<T, HadoopShuffleRemoteState> remoteShuffleStates() {
+ synchronized (mux) {
+ return new HashMap<>(rmtShuffleStates);
+ }
+ }
+
+ /**
* Unsafe value.
*/
private static class UnsafeValue implements HadoopMultimap.Value {
@@ -406,38 +548,50 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @param newBufMinSize Min new buffer size.
*/
private void send(final int idx, int newBufMinSize) {
- final GridFutureAdapter<?> fut = new GridFutureAdapter<>();
-
HadoopShuffleMessage msg = msgs[idx];
final long msgId = msg.id();
- IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
- new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut));
+ final GridFutureAdapter<?> fut;
+
+ if (embedded)
+ fut = null;
+ else {
+ fut = new GridFutureAdapter<>();
+
+ IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
+ new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut));
- assert old == null;
+ assert old == null;
+ }
try {
io.apply(reduceAddrs[idx], msg);
+
+ if (embedded)
+ remoteShuffleState(reduceAddrs[idx]).onShuffleMessage();
}
catch (GridClosureException e) {
- fut.onDone(U.unwrap(e));
+ if (fut != null)
+ fut.onDone(U.unwrap(e));
}
- fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- try {
- f.get();
+ if (fut != null) {
+ fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ f.get();
- // Clean up the future from map only if there was no exception.
- // Otherwise flush() should fail.
- sentMsgs.remove(msgId);
- }
- catch (IgniteCheckedException e) {
- log.error("Failed to send message.", e);
+ // Clean up the future from map only if there was no exception.
+ // Otherwise flush() should fail.
+ sentMsgs.remove(msgId);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Failed to send message.", e);
+ }
}
- }
- });
+ });
+ }
msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
Math.max(msgSize, newBufMinSize));
@@ -513,14 +667,41 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
GridCompoundFuture fut = new GridCompoundFuture<>();
- for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values())
- fut.add(tup.get2());
+ if (embedded) {
+ boolean sent = false;
- fut.markInitialized();
+ for (Map.Entry<T, HadoopShuffleRemoteState> rmtStateEntry : remoteShuffleStates().entrySet()) {
+ T dest = rmtStateEntry.getKey();
+ HadoopShuffleRemoteState rmtState = rmtStateEntry.getValue();
- if (log.isDebugEnabled())
- log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
+ HadoopShuffleFinishRequest req = new HadoopShuffleFinishRequest(job.id(), rmtState.messageCount());
+
+ io.apply(dest, req);
+ if (log.isDebugEnabled())
+ log.debug("Sent shuffle finish request [jobId=" + job.id() + ", dest=" + dest +
+ ", req=" + req + ']');
+
+ fut.add(rmtState.future());
+
+ sent = true;
+ }
+
+ if (sent)
+ fut.markInitialized();
+ else
+ return new GridFinishedFuture<>();
+ }
+ else {
+ for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values())
+ fut.add(tup.get2());
+
+ fut.markInitialized();
+
+ if (log.isDebugEnabled())
+ log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
+
+ }
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java
new file mode 100644
index 0000000..68c0653
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Local shuffle state.
+ */
+class HadoopShuffleLocalState {
+ /** Message counter. */
+ private final AtomicLong msgCnt = new AtomicLong();
+
+ /** Reply guard. */
+ private final AtomicBoolean replyGuard = new AtomicBoolean();
+
+ /** Total message count.*/
+ private volatile long totalMsgCnt;
+
+ /**
+ * Callback invoked when shuffle message arrived.
+ *
+ * @return Whether to perform reply.
+ */
+ public boolean onShuffleMessage() {
+ long msgCnt0 = msgCnt.incrementAndGet();
+
+ return msgCnt0 == totalMsgCnt && reserve();
+ }
+
+ /**
+ * Callback invoked when shuffle is finished.
+ *
+ * @param totalMsgCnt Message count.
+ * @return Whether to perform reply.
+ */
+ public boolean onShuffleFinishMessage(long totalMsgCnt) {
+ this.totalMsgCnt = totalMsgCnt;
+
+ return msgCnt.get() == totalMsgCnt && reserve();
+ }
+
+ /**
+ * Reserve reply.
+ *
+ * @return {@code True} if reserved.
+ */
+ private boolean reserve() {
+ return replyGuard.compareAndSet(false, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
new file mode 100644
index 0000000..5ffaa55
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Remote shuffle state.
+ */
+class HadoopShuffleRemoteState<T> {
+ /** Message count. */
+ private final AtomicLong msgCnt = new AtomicLong();
+
+ /** Completion future. */
+ private final GridFutureAdapter fut = new GridFutureAdapter();
+
+ /**
+ * Callback invoked when shuffle message is sent.
+ */
+ public void onShuffleMessage() {
+ msgCnt.incrementAndGet();
+ }
+
+ /**
+ * Callback invoked on shuffle finish response.
+ */
+ public void onShuffleFinishResponse() {
+ fut.onDone();
+ }
+
+ /**
+ * @return Message count.
+ */
+ public long messageCount() {
+ return msgCnt.get();
+ }
+
+ /**
+ * @return Completion future.
+ */
+ public GridFutureAdapter future() {
+ return fut;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79a16007/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 7001b8c..cb08c00 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -151,7 +151,7 @@ public class HadoopChildProcessRunner {
job.initialize(true, nodeDesc.processId());
shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
- req.totalReducerCount(), req.localReducers());
+ req.totalReducerCount(), req.localReducers(), false);
initializeExecutors(req);
@@ -432,9 +432,7 @@ public class HadoopChildProcessRunner {
try {
HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
- shuffleJob.onShuffleMessage(m);
-
- comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
+ shuffleJob.onShuffleMessage(desc, m);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);