You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/20 12:47:10 UTC
[incubator-celeborn] branch main updated: [CELEBORN-10] Add Message Support MapPartition (#977)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 84361887 [CELEBORN-10] Add Message Support MapPartition (#977)
84361887 is described below
commit 843618877b901566613053d1cbcca19b108d680e
Author: zhongqiangczq <96...@users.noreply.github.com>
AuthorDate: Sun Nov 20 20:47:06 2022 +0800
[CELEBORN-10] Add Message Support MapPartition (#977)
---
.../celeborn/common/network/protocol/Message.java | 19 +++-
.../common/network/protocol/PushDataHandShake.java | 115 ++++++++++++++++++++
.../common/network/protocol/RegionFinish.java | 95 +++++++++++++++++
.../common/network/protocol/RegionStart.java | 117 +++++++++++++++++++++
4 files changed, 345 insertions(+), 1 deletion(-)
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
index 11eabcee..02061426 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
@@ -81,7 +81,10 @@ public abstract class Message implements Encodable {
STREAM_HANDLE(7),
ONE_WAY_MESSAGE(9),
PUSH_DATA(11),
- PUSH_MERGED_DATA(12);
+ PUSH_MERGED_DATA(12),
+ REGION_START(13),
+ REGION_FINISH(14),
+ PUSH_DATA_HAND_SHAKE(15);
private final byte id;
@@ -129,6 +132,12 @@ public abstract class Message implements Encodable {
return PUSH_DATA;
case 12:
return PUSH_MERGED_DATA;
+ case 13:
+ return REGION_START;
+ case 14:
+ return REGION_FINISH;
+ case 15:
+ return PUSH_DATA_HAND_SHAKE;
case -1:
throw new IllegalArgumentException("User type messages cannot be decoded.");
default:
@@ -176,6 +185,14 @@ public abstract class Message implements Encodable {
case PUSH_MERGED_DATA:
return PushMergedData.decode(in, decodeBody);
+ case REGION_START:
+ return RegionStart.decode(in);
+
+ case REGION_FINISH:
+ return RegionFinish.decode(in);
+
+ case PUSH_DATA_HAND_SHAKE:
+ return PushDataHandShake.decode(in);
default:
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
new file mode 100644
index 00000000..d9ce8081
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+public final class PushDataHandShake extends RequestMessage {
+ // 0 for master, 1 for slave, see PartitionLocation.Mode
+ public final byte mode;
+ public final String shuffleKey;
+ public final String partitionUniqueId;
+ public final int attemptId;
+ public final int numPartitions;
+ public final int bufferSize;
+
+ public PushDataHandShake(
+ byte mode,
+ String shuffleKey,
+ String partitionUniqueId,
+ int attemptId,
+ int numSubPartitions,
+ int bufferSize) {
+ this.mode = mode;
+ this.shuffleKey = shuffleKey;
+ this.partitionUniqueId = partitionUniqueId;
+ this.attemptId = attemptId;
+ this.numPartitions = numSubPartitions;
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public Type type() {
+ return Type.PUSH_DATA_HAND_SHAKE;
+ }
+
+ @Override
+ public int encodedLength() {
+ return 1
+ + Encoders.Strings.encodedLength(shuffleKey)
+ + Encoders.Strings.encodedLength(partitionUniqueId)
+ + 4
+ + 4
+ + 4;
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ buf.writeByte(mode);
+ Encoders.Strings.encode(buf, shuffleKey);
+ Encoders.Strings.encode(buf, partitionUniqueId);
+ buf.writeInt(attemptId);
+ buf.writeInt(numPartitions);
+ buf.writeInt(bufferSize);
+ }
+
+ public static PushDataHandShake decode(ByteBuf buf) {
+ byte mode = buf.readByte();
+ String shuffleKey = Encoders.Strings.decode(buf);
+ String partitionUniqueId = Encoders.Strings.decode(buf);
+ int attemptId = buf.readInt();
+ int numPartitions = buf.readInt();
+ int bufferSize = buf.readInt();
+ return new PushDataHandShake(
+ mode, shuffleKey, partitionUniqueId, attemptId, numPartitions, bufferSize);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ mode, shuffleKey, partitionUniqueId, attemptId, numPartitions, bufferSize);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof PushDataHandShake) {
+ PushDataHandShake o = (PushDataHandShake) other;
+ return mode == o.mode
+ && shuffleKey.equals(o.shuffleKey)
+ && partitionUniqueId.equals((o.partitionUniqueId))
+ && attemptId == o.attemptId
+ && numPartitions == o.numPartitions
+ && bufferSize == o.bufferSize
+ && super.equals(o);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("mode", mode)
+ .add("shuffleKey", shuffleKey)
+ .add("partitionUniqueId", partitionUniqueId)
+ .add("attemptId", attemptId)
+ .add("numSubPartitions", numPartitions)
+ .add("bufferSize", bufferSize)
+ .toString();
+ }
+}
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
new file mode 100644
index 00000000..1720399d
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+public final class RegionFinish extends RequestMessage {
+
+ // 0 for master, 1 for slave, see PartitionLocation.Mode
+ public final byte mode;
+
+ public final String shuffleKey;
+ public final String partitionUniqueId;
+ public final int attemptId;
+
+ public RegionFinish(byte mode, String shuffleKey, String partitionUniqueId, int attemptId) {
+ this.mode = mode;
+ this.shuffleKey = shuffleKey;
+ this.partitionUniqueId = partitionUniqueId;
+ this.attemptId = attemptId;
+ }
+
+ @Override
+ public Type type() {
+ return Type.REGION_FINISH;
+ }
+
+ @Override
+ public int encodedLength() {
+ return 1
+ + Encoders.Strings.encodedLength(shuffleKey)
+ + Encoders.Strings.encodedLength(partitionUniqueId)
+ + 4;
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ buf.writeByte(mode);
+ Encoders.Strings.encode(buf, shuffleKey);
+ Encoders.Strings.encode(buf, partitionUniqueId);
+ buf.writeInt(attemptId);
+ }
+
+ public static RegionFinish decode(ByteBuf buf) {
+ byte mode = buf.readByte();
+ String shuffleKey = Encoders.Strings.decode(buf);
+ String partitionUniqueId = Encoders.Strings.decode(buf);
+ int attemptId = buf.readInt();
+ return new RegionFinish(mode, shuffleKey, partitionUniqueId, attemptId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(mode, shuffleKey, partitionUniqueId, attemptId);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof RegionFinish) {
+ RegionFinish o = (RegionFinish) other;
+ return mode == o.mode
+ && shuffleKey.equals(o.shuffleKey)
+ && partitionUniqueId.equals((o.partitionUniqueId))
+ && attemptId == o.attemptId
+ && super.equals(o);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("mode", mode)
+ .add("shuffleKey", shuffleKey)
+ .add("partitionUniqueId", partitionUniqueId)
+ .add("attemptId", attemptId)
+ .toString();
+ }
+}
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
new file mode 100644
index 00000000..f52f6250
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+public final class RegionStart extends RequestMessage {
+
+ // 0 for master, 1 for slave, see PartitionLocation.Mode
+ public final byte mode;
+
+ public final String shuffleKey;
+ public final String partitionUniqueId;
+ public final int attemptId;
+ public int currentRegionIndex;
+ public Boolean isBroadcast;
+
+ public RegionStart(
+ byte mode,
+ String shuffleKey,
+ String partitionUniqueId,
+ int attemptId,
+ int currentRegionIndex,
+ Boolean isBroadcast) {
+ this.mode = mode;
+ this.shuffleKey = shuffleKey;
+ this.partitionUniqueId = partitionUniqueId;
+ this.attemptId = attemptId;
+ this.currentRegionIndex = currentRegionIndex;
+ this.isBroadcast = isBroadcast;
+ }
+
+ @Override
+ public Type type() {
+ return Type.REGION_START;
+ }
+
+ @Override
+ public int encodedLength() {
+ return 1
+ + Encoders.Strings.encodedLength(shuffleKey)
+ + Encoders.Strings.encodedLength(partitionUniqueId)
+ + 4
+ + 4
+ + 1;
+ }
+
+ @Override
+ public void encode(ByteBuf buf) {
+ buf.writeByte(mode);
+ Encoders.Strings.encode(buf, shuffleKey);
+ Encoders.Strings.encode(buf, partitionUniqueId);
+ buf.writeInt(attemptId);
+ buf.writeInt(currentRegionIndex);
+ buf.writeBoolean(isBroadcast);
+ }
+
+ public static RegionStart decode(ByteBuf buf) {
+ byte mode = buf.readByte();
+ String shuffleKey = Encoders.Strings.decode(buf);
+ String partitionUniqueId = Encoders.Strings.decode(buf);
+ int attemptId = buf.readInt();
+ int currentRegionIndex = buf.readInt();
+ boolean isBroadCast = buf.readBoolean();
+ return new RegionStart(
+ mode, shuffleKey, partitionUniqueId, attemptId, currentRegionIndex, isBroadCast);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ mode, shuffleKey, partitionUniqueId, attemptId, currentRegionIndex, isBroadcast);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof RegionStart) {
+ RegionStart o = (RegionStart) other;
+ return mode == o.mode
+ && shuffleKey.equals(o.shuffleKey)
+ && partitionUniqueId.equals((o.partitionUniqueId))
+ && attemptId == o.attemptId
+ && currentRegionIndex == o.currentRegionIndex
+ && isBroadcast == o.isBroadcast
+ && super.equals(o);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("mode", mode)
+ .add("shuffleKey", shuffleKey)
+ .add("partitionUniqueId", partitionUniqueId)
+ .add("attemptId", attemptId)
+ .add("currentRegionIndex", currentRegionIndex)
+ .add("isBroadcast", isBroadcast)
+ .toString();
+ }
+}