You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/06 08:40:43 UTC

[46/50] ignite git commit: IGNITE-4271: Hadoop: shuffle messages now use "direct-marshallable" path this avoiding unnecessary copying as opposed to user messages which were used previously. This closes #1266. This closes #1313.

IGNITE-4271: Hadoop: shuffle messages now use "direct-marshallable" path this avoiding unnecessary copying as opposed to user messages which were used previously. This closes #1266. This closes #1313.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc33d19e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc33d19e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc33d19e

Branch: refs/heads/ignite-comm-balance-master
Commit: bc33d19ed8702d648c90fe25caff639d3a8e0249
Parents: 530d8c8
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 15:49:01 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Dec 5 15:49:01 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   3 +
 .../communication/GridIoMessageFactory.java     |  18 +
 .../internal/processors/hadoop/HadoopJobId.java |  79 +++-
 .../hadoop/message/HadoopMessage.java           |  27 ++
 .../hadoop/shuffle/HadoopShuffleAck.java        | 170 +++++++++
 .../hadoop/shuffle/HadoopShuffleMessage.java    | 361 +++++++++++++++++++
 .../hadoop/message/HadoopMessage.java           |  27 --
 .../hadoop/shuffle/HadoopShuffle.java           |  25 +-
 .../hadoop/shuffle/HadoopShuffleAck.java        |  92 -----
 .../hadoop/shuffle/HadoopShuffleJob.java        |   5 +-
 .../hadoop/shuffle/HadoopShuffleMessage.java    | 242 -------------
 .../child/HadoopChildProcessRunner.java         |   6 +-
 12 files changed, 682 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 248f75b..b5608db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -91,6 +91,9 @@ public enum GridTopic {
     TOPIC_HADOOP,
 
     /** */
+    TOPIC_HADOOP_MSG,
+
+    /** */
     TOPIC_QUERY,
 
     /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index f36191c..dd68984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,6 +117,9 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
 import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -165,6 +168,21 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -30:
+                msg = new HadoopJobId();
+
+                break;
+
+            case -29:
+                msg = new HadoopShuffleAck();
+
+                break;
+
+            case -28:
+                msg = new HadoopShuffleMessage();
+
+                break;
+
             case -27:
                 msg = new GridDhtTxOnePhaseCommitAckRequest();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
index 8c61fab..740ab89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
@@ -21,14 +21,18 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  * Job ID.
  */
-public class HadoopJobId implements GridCacheInternal, Externalizable {
+public class HadoopJobId implements Message, GridCacheInternal, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -63,6 +67,79 @@ public class HadoopJobId implements GridCacheInternal, Externalizable {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeUuid("nodeId", nodeId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("localId", jobId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                nodeId = reader.readUuid("nodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                jobId = reader.readInt("jobId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(HadoopJobId.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -30;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeUuid(out, nodeId);
         out.writeInt(jobId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
new file mode 100644
index 0000000..0d7bd3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.message;
+
+import java.io.Externalizable;
+
+/**
+ * Marker interface for all hadoop messages.
+ */
+public interface HadoopMessage extends Externalizable {
+    // No-op.
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
new file mode 100644
index 0000000..6dd2c2d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Acknowledgement message.
+ */
+public class HadoopShuffleAck implements HadoopMessage, Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    private long msgId;
+
+    /** */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /**
+     *
+     */
+    public HadoopShuffleAck() {
+        // No-op.
+    }
+
+    /**
+     * @param msgId Message ID.
+     */
+    public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
+        assert jobId != null;
+
+        this.msgId = msgId;
+        this.jobId = jobId;
+    }
+
+    /**
+     * @return Message ID.
+     */
+    public long id() {
+        return msgId;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("msgId", msgId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMessage("jobId", jobId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                msgId = reader.readLong("msgId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                jobId = reader.readMessage("jobId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(HadoopShuffleAck.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -29;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+        out.writeLong(msgId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        jobId = new HadoopJobId();
+
+        jobId.readExternal(in);
+        msgId = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopShuffleAck.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
new file mode 100644
index 0000000..3732bc2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Shuffle message.
+ */
+public class HadoopShuffleMessage implements Message, HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final AtomicLong ids = new AtomicLong();
+
+    /** */
+    private static final byte MARKER_KEY = (byte)17;
+
+    /** */
+    private static final byte MARKER_VALUE = (byte)31;
+
+    /** */
+    @GridToStringInclude
+    private long msgId;
+
+    /** */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /** */
+    @GridToStringInclude
+    private int reducer;
+
+    /** */
+    private byte[] buf;
+
+    /** */
+    @GridToStringInclude
+    private int off;
+
+    /**
+     *
+     */
+    public HadoopShuffleMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param size Size.
+     */
+    public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) {
+        assert jobId != null;
+
+        buf = new byte[size];
+
+        this.jobId = jobId;
+        this.reducer = reducer;
+
+        msgId = ids.incrementAndGet();
+    }
+
+    /**
+     * @return Message ID.
+     */
+    public long id() {
+        return msgId;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @return Reducer.
+     */
+    public int reducer() {
+        return reducer;
+    }
+
+    /**
+     * @return Buffer.
+     */
+    public byte[] buffer() {
+        return buf;
+    }
+
+    /**
+     * @return Offset.
+     */
+    public int offset() {
+        return off;
+    }
+
+    /**
+     * @param size Size.
+     * @param valOnly Only value wll be added.
+     * @return {@code true} If this message can fit additional data of this size
+     */
+    public boolean available(int size, boolean valOnly) {
+        size += valOnly ? 5 : 10;
+
+        if (off + size > buf.length) {
+            if (off == 0) { // Resize if requested size is too big.
+                buf = new byte[size];
+
+                return true;
+            }
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * @param keyPtr Key pointer.
+     * @param keySize Key size.
+     */
+    public void addKey(long keyPtr, int keySize) {
+        add(MARKER_KEY, keyPtr, keySize);
+    }
+
+    /**
+     * @param valPtr Value pointer.
+     * @param valSize Value size.
+     */
+    public void addValue(long valPtr, int valSize) {
+        add(MARKER_VALUE, valPtr, valSize);
+    }
+
+    /**
+     * @param marker Marker.
+     * @param ptr Pointer.
+     * @param size Size.
+     */
+    private void add(byte marker, long ptr, int size) {
+        buf[off++] = marker;
+
+        GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size);
+
+        off += 4;
+
+        GridUnsafe.copyOffheapHeap(ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size);
+
+        off += size;
+    }
+
+    /**
+     * @param v Visitor.
+     */
+    public void visit(Visitor v) throws IgniteCheckedException {
+        for (int i = 0; i < off;) {
+            byte marker = buf[i++];
+
+            int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i);
+
+            i += 4;
+
+            if (marker == MARKER_VALUE)
+                v.onValue(buf, i, size);
+            else if (marker == MARKER_KEY)
+                v.onKey(buf, i, size);
+            else
+                throw new IllegalStateException();
+
+            i += size;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("msgId", msgId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeMessage("jobId", jobId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("reducer", reducer))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeByteArray("buf", this.buf))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeInt("off", off))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                msgId = reader.readLong("msgId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                jobId = reader.readMessage("jobId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                reducer = reader.readInt("reducer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                this.buf = reader.readByteArray("buf");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                off = reader.readInt("off");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(HadoopShuffleMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -28;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+        out.writeLong(msgId);
+        out.writeInt(reducer);
+        out.writeInt(off);
+        U.writeByteArray(out, buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        jobId = new HadoopJobId();
+
+        jobId.readExternal(in);
+        msgId = in.readLong();
+        reducer = in.readInt();
+        off = in.readInt();
+        buf = U.readByteArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopShuffleMessage.class, this);
+    }
+
+    /**
+     * Visitor.
+     */
+    public static interface Visitor {
+        /**
+         * @param buf Buffer.
+         * @param off Offset.
+         * @param len Length.
+         */
+        public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException;
+
+        /**
+         * @param buf Buffer.
+         * @param off Offset.
+         * @param len Length.
+         */
+        public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
deleted file mode 100644
index 0d7bd3a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.message;
-
-import java.io.Externalizable;
-
-/**
- * Marker interface for all hadoop messages.
- */
-public interface HadoopMessage extends Externalizable {
-    // No-op.
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 769bdc4..a69e779 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.internal.processors.hadoop.shuffle;
 
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
 import org.apache.ignite.internal.processors.hadoop.HadoopContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
@@ -38,6 +37,11 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * Shuffle.
@@ -53,6 +57,12 @@ public class HadoopShuffle extends HadoopComponent {
     @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
         super.start(ctx);
 
+        ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                onMessageReceived(nodeId, (HadoopMessage)msg);
+            }
+        });
+
         ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
             new IgniteBiPredicate<UUID, Object>() {
                 @Override public boolean apply(UUID nodeId, Object msg) {
@@ -117,7 +127,10 @@ public class HadoopShuffle extends HadoopComponent {
     private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
         ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
 
-        ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+        if (msg instanceof Message)
+            ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+        else
+            ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
     }
 
     /**
@@ -151,8 +164,8 @@ public class HadoopShuffle extends HadoopComponent {
      */
     private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
         shuffleJob.startSending(ctx.kernalContext().gridName(),
-            new IgniteInClosure2X<UUID, HadoopShuffleMessage>() {
-                @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException {
+            new IgniteInClosure2X<UUID, HadoopMessage>() {
+                @Override public void applyx(UUID dest, HadoopMessage msg) throws IgniteCheckedException {
                     send0(dest, msg);
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
deleted file mode 100644
index 6013ec6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Acknowledgement message.
- */
-public class HadoopShuffleAck implements HadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    @GridToStringInclude
-    private long msgId;
-
-    /** */
-    @GridToStringInclude
-    private HadoopJobId jobId;
-
-    /**
-     *
-     */
-    public HadoopShuffleAck() {
-        // No-op.
-    }
-
-    /**
-     * @param msgId Message ID.
-     */
-    public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
-        assert jobId != null;
-
-        this.msgId = msgId;
-        this.jobId = jobId;
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long id() {
-        return msgId;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-        out.writeLong(msgId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        jobId = new HadoopJobId();
-
-        jobId.readExternal(in);
-        msgId = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopShuffleAck.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 025c4da..9392b2c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
@@ -92,7 +93,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     private final AtomicReferenceArray<HadoopMultimap> maps;
 
     /** */
-    private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
+    private volatile IgniteInClosure2X<T, HadoopMessage> io;
 
     /** */
     protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs =
@@ -176,7 +177,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      * @param io IO Closure for sending messages.
      */
     @SuppressWarnings("BusyWait")
-    public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) {
+    public void startSending(String gridName, IgniteInClosure2X<T, HadoopMessage> io) {
         assert snd == null;
         assert io != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
deleted file mode 100644
index 71a314b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Shuffle message.
- */
-public class HadoopShuffleMessage implements HadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private static final AtomicLong ids = new AtomicLong();
-
-    /** */
-    private static final byte MARKER_KEY = (byte)17;
-
-    /** */
-    private static final byte MARKER_VALUE = (byte)31;
-
-    /** */
-    @GridToStringInclude
-    private long msgId;
-
-    /** */
-    @GridToStringInclude
-    private HadoopJobId jobId;
-
-    /** */
-    @GridToStringInclude
-    private int reducer;
-
-    /** */
-    private byte[] buf;
-
-    /** */
-    @GridToStringInclude
-    private int off;
-
-    /**
-     *
-     */
-    public HadoopShuffleMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param size Size.
-     */
-    public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) {
-        assert jobId != null;
-
-        buf = new byte[size];
-
-        this.jobId = jobId;
-        this.reducer = reducer;
-
-        msgId = ids.incrementAndGet();
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long id() {
-        return msgId;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @return Reducer.
-     */
-    public int reducer() {
-        return reducer;
-    }
-
-    /**
-     * @return Buffer.
-     */
-    public byte[] buffer() {
-        return buf;
-    }
-
-    /**
-     * @return Offset.
-     */
-    public int offset() {
-        return off;
-    }
-
-    /**
-     * @param size Size.
-     * @param valOnly Only value wll be added.
-     * @return {@code true} If this message can fit additional data of this size
-     */
-    public boolean available(int size, boolean valOnly) {
-        size += valOnly ? 5 : 10;
-
-        if (off + size > buf.length) {
-            if (off == 0) { // Resize if requested size is too big.
-                buf = new byte[size];
-
-                return true;
-            }
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * @param keyPtr Key pointer.
-     * @param keySize Key size.
-     */
-    public void addKey(long keyPtr, int keySize) {
-        add(MARKER_KEY, keyPtr, keySize);
-    }
-
-    /**
-     * @param valPtr Value pointer.
-     * @param valSize Value size.
-     */
-    public void addValue(long valPtr, int valSize) {
-        add(MARKER_VALUE, valPtr, valSize);
-    }
-
-    /**
-     * @param marker Marker.
-     * @param ptr Pointer.
-     * @param size Size.
-     */
-    private void add(byte marker, long ptr, int size) {
-        buf[off++] = marker;
-
-        GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size);
-
-        off += 4;
-
-        GridUnsafe.copyOffheapHeap(ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size);
-
-        off += size;
-    }
-
-    /**
-     * @param v Visitor.
-     */
-    public void visit(Visitor v) throws IgniteCheckedException {
-        for (int i = 0; i < off;) {
-            byte marker = buf[i++];
-
-            int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i);
-
-            i += 4;
-
-            if (marker == MARKER_VALUE)
-                v.onValue(buf, i, size);
-            else if (marker == MARKER_KEY)
-                v.onKey(buf, i, size);
-            else
-                throw new IllegalStateException();
-
-            i += size;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-        out.writeLong(msgId);
-        out.writeInt(reducer);
-        out.writeInt(off);
-        U.writeByteArray(out, buf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        jobId = new HadoopJobId();
-
-        jobId.readExternal(in);
-        msgId = in.readLong();
-        reducer = in.readInt();
-        off = in.readInt();
-        buf = U.readByteArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopShuffleMessage.class, this);
-    }
-
-    /**
-     * Visitor.
-     */
-    public static interface Visitor {
-        /**
-         * @param buf Buffer.
-         * @param off Offset.
-         * @param len Length.
-         */
-        public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException;
-
-        /**
-         * @param buf Buffer.
-         * @param off Offset.
-         * @param len Length.
-         */
-        public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/bc33d19e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 45d9a27..7001b8c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -259,9 +259,9 @@ public class HadoopChildProcessRunner {
                 if (req.reducersAddresses() != null) {
                     if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
                         shuffleJob.startSending("external",
-                            new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
-                                @Override public void applyx(HadoopProcessDescriptor dest,
-                                    HadoopShuffleMessage msg) throws IgniteCheckedException {
+                            new IgniteInClosure2X<HadoopProcessDescriptor, HadoopMessage>() {
+                                @Override public void applyx(HadoopProcessDescriptor dest, HadoopMessage msg)
+                                    throws IgniteCheckedException {
                                     comm.sendMessage(dest, msg);
                                 }
                             });