You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/21 14:34:15 UTC
ignite git commit: Other fixes.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.6.10-hadoop-debug c2bff238e -> 5c733d5a9
Other fixes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c733d5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c733d5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c733d5a
Branch: refs/heads/ignite-1.6.10-hadoop-debug
Commit: 5c733d5a9613a1893cdf2f66f5df27db61618d22
Parents: c2bff23
Author: devozerov <vo...@gridgain.com>
Authored: Mon Nov 21 17:34:02 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Nov 21 17:34:02 2016 +0300
----------------------------------------------------------------------
.../hadoop/shuffle/HadoopShuffle.java | 30 +++++++++++++++-----
.../hadoop/shuffle/HadoopShuffleJob.java | 22 ++++++++++++--
2 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5c733d5a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 23328ba..c1d8cfd 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -17,9 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.shuffle;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridTopic;
@@ -35,12 +32,18 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
* Shuffle.
@@ -98,8 +101,10 @@ public class HadoopShuffle extends HadoopComponent {
private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
+ int bufSize = ((TcpCommunicationSpi)ctx.kernalContext().config().getCommunicationSpi()).getSocketSendBuffer();
+
HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
- ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+ ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()), bufSize);
UUID[] rdcAddrs = new UUID[plan.reducers()];
@@ -123,11 +128,22 @@ public class HadoopShuffle extends HadoopComponent {
* @param msg Message to send.
* @throws IgniteCheckedException If send failed.
*/
- private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
+ private void send0(final UUID nodeId, final Object msg) throws IgniteCheckedException {
ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
- if (msg instanceof Message)
- ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP2, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+ if (msg instanceof Message) {
+ if (msg instanceof HadoopShuffleMessage && F.eq(nodeId, ctx.localNodeId())) {
+ ctx.kernalContext().closure().callLocalSafe(new GridPlainCallable<Void>() {
+ @Override public Void call() throws Exception {
+ onMessageReceived(nodeId, (HadoopMessage)msg);
+
+ return null;
+ }
+ }, false);
+ }
+ else
+ ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP2, (Message) msg, GridIoPolicy.PUBLIC_POOL);
+ }
else
ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5c733d5a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 1411b36..f586234 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -108,6 +108,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** */
private final IgniteLogger log;
+ /** Buffer size. */
+ private final int bufSize;
+
/**
* @param locReduceAddr Local reducer address.
* @param log Logger.
@@ -119,10 +122,25 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
*/
public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
+ this(locReduceAddr, log, job, mem, totalReducerCnt, locReducers, MSG_BUF_SIZE);
+ }
+
+ /**
+ * @param locReduceAddr Local reducer address.
+ * @param log Logger.
+ * @param job Job.
+ * @param mem Memory.
+ * @param totalReducerCnt Amount of reducers in the Job.
+ * @param locReducers Reducers will work on current node.
+ * @throws IgniteCheckedException If error.
+ */
+ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
+ int totalReducerCnt, int[] locReducers, int bufSize) throws IgniteCheckedException {
this.locReduceAddr = locReduceAddr;
this.job = job;
this.mem = mem;
this.log = log.getLogger(HadoopShuffleJob.class);
+ this.bufSize = bufSize;
if (!F.isEmpty(locReducers)) {
for (int rdc : locReducers) {
@@ -311,7 +329,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
continue; // Skip empty map and local node.
if (msgs[i] == null)
- msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
+ msgs[i] = new HadoopShuffleMessage(job.id(), i, bufSize);
final int idx = i;
@@ -416,7 +434,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
});
msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
- Math.max(MSG_BUF_SIZE, newBufMinSize));
+ Math.max(bufSize, newBufMinSize));
}
/** {@inheritDoc} */