You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/21 14:34:15 UTC

ignite git commit: Other fixes.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1.6.10-hadoop-debug c2bff238e -> 5c733d5a9


Other fixes.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c733d5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c733d5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c733d5a

Branch: refs/heads/ignite-1.6.10-hadoop-debug
Commit: 5c733d5a9613a1893cdf2f66f5df27db61618d22
Parents: c2bff23
Author: devozerov <vo...@gridgain.com>
Authored: Mon Nov 21 17:34:02 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Nov 21 17:34:02 2016 +0300

----------------------------------------------------------------------
 .../hadoop/shuffle/HadoopShuffle.java           | 30 +++++++++++++++-----
 .../hadoop/shuffle/HadoopShuffleJob.java        | 22 ++++++++++++--
 2 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5c733d5a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 23328ba..c1d8cfd 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.internal.processors.hadoop.shuffle;
 
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridTopic;
@@ -35,12 +32,18 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
 import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * Shuffle.
@@ -98,8 +101,10 @@ public class HadoopShuffle extends HadoopComponent {
     private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
         HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
 
+        int bufSize = ((TcpCommunicationSpi)ctx.kernalContext().config().getCommunicationSpi()).getSocketSendBuffer();
+
         HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
-            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()), bufSize);
 
         UUID[] rdcAddrs = new UUID[plan.reducers()];
 
@@ -123,11 +128,22 @@ public class HadoopShuffle extends HadoopComponent {
      * @param msg Message to send.
      * @throws IgniteCheckedException If send failed.
      */
-    private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
+    private void send0(final UUID nodeId, final Object msg) throws IgniteCheckedException {
         ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
 
-        if (msg instanceof Message)
-            ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP2, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+        if (msg instanceof Message) {
+            if (msg instanceof HadoopShuffleMessage && F.eq(nodeId, ctx.localNodeId())) {
+                ctx.kernalContext().closure().callLocalSafe(new GridPlainCallable<Void>() {
+                    @Override public Void call() throws Exception {
+                        onMessageReceived(nodeId, (HadoopMessage)msg);
+
+                        return null;
+                    }
+                }, false);
+            }
+            else
+                ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP2, (Message) msg, GridIoPolicy.PUBLIC_POOL);
+        }
         else
             ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c733d5a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 1411b36..f586234 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -108,6 +108,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** */
     private final IgniteLogger log;
 
+    /** Buffer size. */
+    private final int bufSize;
+
     /**
      * @param locReduceAddr Local reducer address.
      * @param log Logger.
@@ -119,10 +122,25 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      */
     public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
         int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
+        this(locReduceAddr, log, job, mem, totalReducerCnt, locReducers, MSG_BUF_SIZE);
+    }
+
+    /**
+     * @param locReduceAddr Local reducer address.
+     * @param log Logger.
+     * @param job Job.
+     * @param mem Memory.
+     * @param totalReducerCnt Amount of reducers in the Job.
+     * @param locReducers Reducers will work on current node.
+     * @throws IgniteCheckedException If error.
+     */
+    public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
+        int totalReducerCnt, int[] locReducers, int bufSize) throws IgniteCheckedException {
         this.locReduceAddr = locReduceAddr;
         this.job = job;
         this.mem = mem;
         this.log = log.getLogger(HadoopShuffleJob.class);
+        this.bufSize = bufSize;
 
         if (!F.isEmpty(locReducers)) {
             for (int rdc : locReducers) {
@@ -311,7 +329,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
                 continue; // Skip empty map and local node.
 
             if (msgs[i] == null)
-                msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
+                msgs[i] = new HadoopShuffleMessage(job.id(), i, bufSize);
 
             final int idx = i;
 
@@ -416,7 +434,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         });
 
         msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
-            Math.max(MSG_BUF_SIZE, newBufMinSize));
+            Math.max(bufSize, newBufMinSize));
     }
 
     /** {@inheritDoc} */