You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:15:59 UTC
[19/50] [abbrv] ignite git commit: IGNITE-4271: Hadoop: shuffle
messages now use "direct-marshallable" path this avoiding unnecessary copying
as opposed to user messages which were used previously. This closes #1266.
This closes #1313.
IGNITE-4271: Hadoop: shuffle messages now use "direct-marshallable" path this avoiding unnecessary copying as opposed to user messages which were used previously. This closes #1266. This closes #1313.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7b97cfd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7b97cfd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7b97cfd
Branch: refs/heads/master
Commit: b7b97cfd3419da4421423e189369a4931efd3996
Parents: 3c38421
Author: devozerov <vo...@gridgain.com>
Authored: Mon Dec 5 15:49:01 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Dec 15 13:45:37 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/GridTopic.java | 3 +
.../communication/GridIoMessageFactory.java | 18 +
.../internal/processors/hadoop/HadoopJobId.java | 79 +++-
.../hadoop/message/HadoopMessage.java | 27 ++
.../hadoop/shuffle/HadoopShuffleAck.java | 170 +++++++++
.../hadoop/shuffle/HadoopShuffleMessage.java | 361 +++++++++++++++++++
.../hadoop/message/HadoopMessage.java | 27 --
.../hadoop/shuffle/HadoopShuffle.java | 25 +-
.../hadoop/shuffle/HadoopShuffleAck.java | 92 -----
.../hadoop/shuffle/HadoopShuffleJob.java | 5 +-
.../hadoop/shuffle/HadoopShuffleMessage.java | 242 -------------
.../child/HadoopChildProcessRunner.java | 6 +-
12 files changed, 682 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 248f75b..b5608db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -91,6 +91,9 @@ public enum GridTopic {
TOPIC_HADOOP,
/** */
+ TOPIC_HADOOP_MSG,
+
+ /** */
TOPIC_QUERY,
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index f36191c..dd68984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -117,6 +117,9 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -165,6 +168,21 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -30:
+ msg = new HadoopJobId();
+
+ break;
+
+ case -29:
+ msg = new HadoopShuffleAck();
+
+ break;
+
+ case -28:
+ msg = new HadoopShuffleMessage();
+
+ break;
+
case -27:
msg = new GridDhtTxOnePhaseCommitAckRequest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
index 8c61fab..740ab89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobId.java
@@ -21,14 +21,18 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Job ID.
*/
-public class HadoopJobId implements GridCacheInternal, Externalizable {
+public class HadoopJobId implements Message, GridCacheInternal, Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -63,6 +67,79 @@ public class HadoopJobId implements GridCacheInternal, Externalizable {
}
/** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeUuid("nodeId", nodeId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("localId", jobId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ nodeId = reader.readUuid("nodeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ jobId = reader.readInt("jobId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(HadoopJobId.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -30;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeUuid(out, nodeId);
out.writeInt(jobId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
new file mode 100644
index 0000000..0d7bd3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.message;
+
+import java.io.Externalizable;
+
+/**
+ * Marker interface for all hadoop messages.
+ */
+public interface HadoopMessage extends Externalizable {
+ // No-op.
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
new file mode 100644
index 0000000..6dd2c2d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Acknowledgement message.
+ */
+public class HadoopShuffleAck implements HadoopMessage, Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridToStringInclude
+ private long msgId;
+
+ /** */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /**
+ *
+ */
+ public HadoopShuffleAck() {
+ // No-op.
+ }
+
+ /**
+ * @param msgId Message ID.
+ */
+ public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
+ assert jobId != null;
+
+ this.msgId = msgId;
+ this.jobId = jobId;
+ }
+
+ /**
+ * @return Message ID.
+ */
+ public long id() {
+ return msgId;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("msgId", msgId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeMessage("jobId", jobId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ msgId = reader.readLong("msgId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ jobId = reader.readMessage("jobId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(HadoopShuffleAck.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -29;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+ out.writeLong(msgId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+
+ jobId.readExternal(in);
+ msgId = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopShuffleAck.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
new file mode 100644
index 0000000..3732bc2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Shuffle message.
+ */
+public class HadoopShuffleMessage implements Message, HadoopMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final AtomicLong ids = new AtomicLong();
+
+ /** */
+ private static final byte MARKER_KEY = (byte)17;
+
+ /** */
+ private static final byte MARKER_VALUE = (byte)31;
+
+ /** */
+ @GridToStringInclude
+ private long msgId;
+
+ /** */
+ @GridToStringInclude
+ private HadoopJobId jobId;
+
+ /** */
+ @GridToStringInclude
+ private int reducer;
+
+ /** */
+ private byte[] buf;
+
+ /** */
+ @GridToStringInclude
+ private int off;
+
+ /**
+ *
+ */
+ public HadoopShuffleMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param size Size.
+ */
+ public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) {
+ assert jobId != null;
+
+ buf = new byte[size];
+
+ this.jobId = jobId;
+ this.reducer = reducer;
+
+ msgId = ids.incrementAndGet();
+ }
+
+ /**
+ * @return Message ID.
+ */
+ public long id() {
+ return msgId;
+ }
+
+ /**
+ * @return Job ID.
+ */
+ public HadoopJobId jobId() {
+ return jobId;
+ }
+
+ /**
+ * @return Reducer.
+ */
+ public int reducer() {
+ return reducer;
+ }
+
+ /**
+ * @return Buffer.
+ */
+ public byte[] buffer() {
+ return buf;
+ }
+
+ /**
+ * @return Offset.
+ */
+ public int offset() {
+ return off;
+ }
+
+ /**
+ * @param size Size.
+ * @param valOnly Only value wll be added.
+ * @return {@code true} If this message can fit additional data of this size
+ */
+ public boolean available(int size, boolean valOnly) {
+ size += valOnly ? 5 : 10;
+
+ if (off + size > buf.length) {
+ if (off == 0) { // Resize if requested size is too big.
+ buf = new byte[size];
+
+ return true;
+ }
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @param keyPtr Key pointer.
+ * @param keySize Key size.
+ */
+ public void addKey(long keyPtr, int keySize) {
+ add(MARKER_KEY, keyPtr, keySize);
+ }
+
+ /**
+ * @param valPtr Value pointer.
+ * @param valSize Value size.
+ */
+ public void addValue(long valPtr, int valSize) {
+ add(MARKER_VALUE, valPtr, valSize);
+ }
+
+ /**
+ * @param marker Marker.
+ * @param ptr Pointer.
+ * @param size Size.
+ */
+ private void add(byte marker, long ptr, int size) {
+ buf[off++] = marker;
+
+ GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size);
+
+ off += 4;
+
+ GridUnsafe.copyOffheapHeap(ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size);
+
+ off += size;
+ }
+
+ /**
+ * @param v Visitor.
+ */
+ public void visit(Visitor v) throws IgniteCheckedException {
+ for (int i = 0; i < off;) {
+ byte marker = buf[i++];
+
+ int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i);
+
+ i += 4;
+
+ if (marker == MARKER_VALUE)
+ v.onValue(buf, i, size);
+ else if (marker == MARKER_KEY)
+ v.onKey(buf, i, size);
+ else
+ throw new IllegalStateException();
+
+ i += size;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("msgId", msgId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeMessage("jobId", jobId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeInt("reducer", reducer))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeByteArray("buf", this.buf))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeInt("off", off))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ msgId = reader.readLong("msgId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ jobId = reader.readMessage("jobId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ reducer = reader.readInt("reducer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ this.buf = reader.readByteArray("buf");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ off = reader.readInt("off");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(HadoopShuffleMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -28;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ jobId.writeExternal(out);
+ out.writeLong(msgId);
+ out.writeInt(reducer);
+ out.writeInt(off);
+ U.writeByteArray(out, buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ jobId = new HadoopJobId();
+
+ jobId.readExternal(in);
+ msgId = in.readLong();
+ reducer = in.readInt();
+ off = in.readInt();
+ buf = U.readByteArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopShuffleMessage.class, this);
+ }
+
+ /**
+ * Visitor.
+ */
+ public static interface Visitor {
+ /**
+ * @param buf Buffer.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException;
+
+ /**
+ * @param buf Buffer.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
deleted file mode 100644
index 0d7bd3a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.message;
-
-import java.io.Externalizable;
-
-/**
- * Marker interface for all hadoop messages.
- */
-public interface HadoopMessage extends Externalizable {
- // No-op.
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 769bdc4..a69e779 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -17,13 +17,12 @@
package org.apache.ignite.internal.processors.hadoop.shuffle;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
import org.apache.ignite.internal.processors.hadoop.HadoopContext;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
@@ -38,6 +37,11 @@ import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
* Shuffle.
@@ -53,6 +57,12 @@ public class HadoopShuffle extends HadoopComponent {
@Override public void start(HadoopContext ctx) throws IgniteCheckedException {
super.start(ctx);
+ ctx.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ onMessageReceived(nodeId, (HadoopMessage)msg);
+ }
+ });
+
ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
new IgniteBiPredicate<UUID, Object>() {
@Override public boolean apply(UUID nodeId, Object msg) {
@@ -117,7 +127,10 @@ public class HadoopShuffle extends HadoopComponent {
private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
- ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+ if (msg instanceof Message)
+ ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+ else
+ ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
}
/**
@@ -151,8 +164,8 @@ public class HadoopShuffle extends HadoopComponent {
*/
private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
shuffleJob.startSending(ctx.kernalContext().gridName(),
- new IgniteInClosure2X<UUID, HadoopShuffleMessage>() {
- @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException {
+ new IgniteInClosure2X<UUID, HadoopMessage>() {
+ @Override public void applyx(UUID dest, HadoopMessage msg) throws IgniteCheckedException {
send0(dest, msg);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
deleted file mode 100644
index 6013ec6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Acknowledgement message.
- */
-public class HadoopShuffleAck implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- @GridToStringInclude
- private long msgId;
-
- /** */
- @GridToStringInclude
- private HadoopJobId jobId;
-
- /**
- *
- */
- public HadoopShuffleAck() {
- // No-op.
- }
-
- /**
- * @param msgId Message ID.
- */
- public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
- assert jobId != null;
-
- this.msgId = msgId;
- this.jobId = jobId;
- }
-
- /**
- * @return Message ID.
- */
- public long id() {
- return msgId;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
- out.writeLong(msgId);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new HadoopJobId();
-
- jobId.readExternal(in);
- msgId = in.readLong();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopShuffleAck.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 025c4da..9392b2c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
@@ -92,7 +93,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
private final AtomicReferenceArray<HadoopMultimap> maps;
/** */
- private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
+ private volatile IgniteInClosure2X<T, HadoopMessage> io;
/** */
protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs =
@@ -176,7 +177,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
* @param io IO Closure for sending messages.
*/
@SuppressWarnings("BusyWait")
- public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) {
+ public void startSending(String gridName, IgniteInClosure2X<T, HadoopMessage> io) {
assert snd == null;
assert io != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
deleted file mode 100644
index 71a314b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Shuffle message.
- */
-public class HadoopShuffleMessage implements HadoopMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private static final AtomicLong ids = new AtomicLong();
-
- /** */
- private static final byte MARKER_KEY = (byte)17;
-
- /** */
- private static final byte MARKER_VALUE = (byte)31;
-
- /** */
- @GridToStringInclude
- private long msgId;
-
- /** */
- @GridToStringInclude
- private HadoopJobId jobId;
-
- /** */
- @GridToStringInclude
- private int reducer;
-
- /** */
- private byte[] buf;
-
- /** */
- @GridToStringInclude
- private int off;
-
- /**
- *
- */
- public HadoopShuffleMessage() {
- // No-op.
- }
-
- /**
- * @param size Size.
- */
- public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) {
- assert jobId != null;
-
- buf = new byte[size];
-
- this.jobId = jobId;
- this.reducer = reducer;
-
- msgId = ids.incrementAndGet();
- }
-
- /**
- * @return Message ID.
- */
- public long id() {
- return msgId;
- }
-
- /**
- * @return Job ID.
- */
- public HadoopJobId jobId() {
- return jobId;
- }
-
- /**
- * @return Reducer.
- */
- public int reducer() {
- return reducer;
- }
-
- /**
- * @return Buffer.
- */
- public byte[] buffer() {
- return buf;
- }
-
- /**
- * @return Offset.
- */
- public int offset() {
- return off;
- }
-
- /**
- * @param size Size.
- * @param valOnly Only value wll be added.
- * @return {@code true} If this message can fit additional data of this size
- */
- public boolean available(int size, boolean valOnly) {
- size += valOnly ? 5 : 10;
-
- if (off + size > buf.length) {
- if (off == 0) { // Resize if requested size is too big.
- buf = new byte[size];
-
- return true;
- }
-
- return false;
- }
-
- return true;
- }
-
- /**
- * @param keyPtr Key pointer.
- * @param keySize Key size.
- */
- public void addKey(long keyPtr, int keySize) {
- add(MARKER_KEY, keyPtr, keySize);
- }
-
- /**
- * @param valPtr Value pointer.
- * @param valSize Value size.
- */
- public void addValue(long valPtr, int valSize) {
- add(MARKER_VALUE, valPtr, valSize);
- }
-
- /**
- * @param marker Marker.
- * @param ptr Pointer.
- * @param size Size.
- */
- private void add(byte marker, long ptr, int size) {
- buf[off++] = marker;
-
- GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size);
-
- off += 4;
-
- GridUnsafe.copyOffheapHeap(ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size);
-
- off += size;
- }
-
- /**
- * @param v Visitor.
- */
- public void visit(Visitor v) throws IgniteCheckedException {
- for (int i = 0; i < off;) {
- byte marker = buf[i++];
-
- int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i);
-
- i += 4;
-
- if (marker == MARKER_VALUE)
- v.onValue(buf, i, size);
- else if (marker == MARKER_KEY)
- v.onKey(buf, i, size);
- else
- throw new IllegalStateException();
-
- i += size;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- jobId.writeExternal(out);
- out.writeLong(msgId);
- out.writeInt(reducer);
- out.writeInt(off);
- U.writeByteArray(out, buf);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- jobId = new HadoopJobId();
-
- jobId.readExternal(in);
- msgId = in.readLong();
- reducer = in.readInt();
- off = in.readInt();
- buf = U.readByteArray(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopShuffleMessage.class, this);
- }
-
- /**
- * Visitor.
- */
- public static interface Visitor {
- /**
- * @param buf Buffer.
- * @param off Offset.
- * @param len Length.
- */
- public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException;
-
- /**
- * @param buf Buffer.
- * @param off Offset.
- * @param len Length.
- */
- public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b7b97cfd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 45d9a27..7001b8c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -259,9 +259,9 @@ public class HadoopChildProcessRunner {
if (req.reducersAddresses() != null) {
if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
shuffleJob.startSending("external",
- new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
- @Override public void applyx(HadoopProcessDescriptor dest,
- HadoopShuffleMessage msg) throws IgniteCheckedException {
+ new IgniteInClosure2X<HadoopProcessDescriptor, HadoopMessage>() {
+ @Override public void applyx(HadoopProcessDescriptor dest, HadoopMessage msg)
+ throws IgniteCheckedException {
comm.sendMessage(dest, msg);
}
});