You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/20 12:47:10 UTC

[incubator-celeborn] branch main updated: [CELEBORN-10] Add Message Support MapPartition (#977)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 84361887 [CELEBORN-10] Add Message Support MapPartition (#977)
84361887 is described below

commit 843618877b901566613053d1cbcca19b108d680e
Author: zhongqiangczq <96...@users.noreply.github.com>
AuthorDate: Sun Nov 20 20:47:06 2022 +0800

    [CELEBORN-10] Add Message Support MapPartition (#977)
---
 .../celeborn/common/network/protocol/Message.java  |  19 +++-
 .../common/network/protocol/PushDataHandShake.java | 115 ++++++++++++++++++++
 .../common/network/protocol/RegionFinish.java      |  95 +++++++++++++++++
 .../common/network/protocol/RegionStart.java       | 117 +++++++++++++++++++++
 4 files changed, 345 insertions(+), 1 deletion(-)

diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
index 11eabcee..02061426 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
@@ -81,7 +81,10 @@ public abstract class Message implements Encodable {
     STREAM_HANDLE(7),
     ONE_WAY_MESSAGE(9),
     PUSH_DATA(11),
-    PUSH_MERGED_DATA(12);
+    PUSH_MERGED_DATA(12),
+    REGION_START(13),
+    REGION_FINISH(14),
+    PUSH_DATA_HAND_SHAKE(15);
 
     private final byte id;
 
@@ -129,6 +132,12 @@ public abstract class Message implements Encodable {
           return PUSH_DATA;
         case 12:
           return PUSH_MERGED_DATA;
+        case 13:
+          return REGION_START;
+        case 14:
+          return REGION_FINISH;
+        case 15:
+          return PUSH_DATA_HAND_SHAKE;
         case -1:
           throw new IllegalArgumentException("User type messages cannot be decoded.");
         default:
@@ -176,6 +185,14 @@ public abstract class Message implements Encodable {
       case PUSH_MERGED_DATA:
         return PushMergedData.decode(in, decodeBody);
 
+      case REGION_START:
+        return RegionStart.decode(in);
+
+      case REGION_FINISH:
+        return RegionFinish.decode(in);
+
+      case PUSH_DATA_HAND_SHAKE:
+        return PushDataHandShake.decode(in);
       default:
         throw new IllegalArgumentException("Unexpected message type: " + msgType);
     }
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
new file mode 100644
index 00000000..d9ce8081
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/PushDataHandShake.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+public final class PushDataHandShake extends RequestMessage {
+  // 0 for master, 1 for slave, see PartitionLocation.Mode
+  public final byte mode;
+  public final String shuffleKey;
+  public final String partitionUniqueId;
+  public final int attemptId;
+  public final int numPartitions;
+  public final int bufferSize;
+
+  public PushDataHandShake(
+      byte mode,
+      String shuffleKey,
+      String partitionUniqueId,
+      int attemptId,
+      int numSubPartitions,
+      int bufferSize) {
+    this.mode = mode;
+    this.shuffleKey = shuffleKey;
+    this.partitionUniqueId = partitionUniqueId;
+    this.attemptId = attemptId;
+    this.numPartitions = numSubPartitions;
+    this.bufferSize = bufferSize;
+  }
+
+  @Override
+  public Type type() {
+    return Type.PUSH_DATA_HAND_SHAKE;
+  }
+
+  @Override
+  public int encodedLength() {
+    return 1
+        + Encoders.Strings.encodedLength(shuffleKey)
+        + Encoders.Strings.encodedLength(partitionUniqueId)
+        + 4
+        + 4
+        + 4;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    buf.writeByte(mode);
+    Encoders.Strings.encode(buf, shuffleKey);
+    Encoders.Strings.encode(buf, partitionUniqueId);
+    buf.writeInt(attemptId);
+    buf.writeInt(numPartitions);
+    buf.writeInt(bufferSize);
+  }
+
+  public static PushDataHandShake decode(ByteBuf buf) {
+    byte mode = buf.readByte();
+    String shuffleKey = Encoders.Strings.decode(buf);
+    String partitionUniqueId = Encoders.Strings.decode(buf);
+    int attemptId = buf.readInt();
+    int numPartitions = buf.readInt();
+    int bufferSize = buf.readInt();
+    return new PushDataHandShake(
+        mode, shuffleKey, partitionUniqueId, attemptId, numPartitions, bufferSize);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        mode, shuffleKey, partitionUniqueId, attemptId, numPartitions, bufferSize);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof PushDataHandShake) {
+      PushDataHandShake o = (PushDataHandShake) other;
+      return mode == o.mode
+          && shuffleKey.equals(o.shuffleKey)
+          && partitionUniqueId.equals((o.partitionUniqueId))
+          && attemptId == o.attemptId
+          && numPartitions == o.numPartitions
+          && bufferSize == o.bufferSize
+          && super.equals(o);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("mode", mode)
+        .add("shuffleKey", shuffleKey)
+        .add("partitionUniqueId", partitionUniqueId)
+        .add("attemptId", attemptId)
+        .add("numSubPartitions", numPartitions)
+        .add("bufferSize", bufferSize)
+        .toString();
+  }
+}
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
new file mode 100644
index 00000000..1720399d
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionFinish.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+public final class RegionFinish extends RequestMessage {
+
+  // 0 for master, 1 for slave, see PartitionLocation.Mode
+  public final byte mode;
+
+  public final String shuffleKey;
+  public final String partitionUniqueId;
+  public final int attemptId;
+
+  public RegionFinish(byte mode, String shuffleKey, String partitionUniqueId, int attemptId) {
+    this.mode = mode;
+    this.shuffleKey = shuffleKey;
+    this.partitionUniqueId = partitionUniqueId;
+    this.attemptId = attemptId;
+  }
+
+  @Override
+  public Type type() {
+    return Type.REGION_FINISH;
+  }
+
+  @Override
+  public int encodedLength() {
+    return 1
+        + Encoders.Strings.encodedLength(shuffleKey)
+        + Encoders.Strings.encodedLength(partitionUniqueId)
+        + 4;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    buf.writeByte(mode);
+    Encoders.Strings.encode(buf, shuffleKey);
+    Encoders.Strings.encode(buf, partitionUniqueId);
+    buf.writeInt(attemptId);
+  }
+
+  public static RegionFinish decode(ByteBuf buf) {
+    byte mode = buf.readByte();
+    String shuffleKey = Encoders.Strings.decode(buf);
+    String partitionUniqueId = Encoders.Strings.decode(buf);
+    int attemptId = buf.readInt();
+    return new RegionFinish(mode, shuffleKey, partitionUniqueId, attemptId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(mode, shuffleKey, partitionUniqueId, attemptId);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof RegionFinish) {
+      RegionFinish o = (RegionFinish) other;
+      return mode == o.mode
+          && shuffleKey.equals(o.shuffleKey)
+          && partitionUniqueId.equals((o.partitionUniqueId))
+          && attemptId == o.attemptId
+          && super.equals(o);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("mode", mode)
+        .add("shuffleKey", shuffleKey)
+        .add("partitionUniqueId", partitionUniqueId)
+        .add("attemptId", attemptId)
+        .toString();
+  }
+}
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
new file mode 100644
index 00000000..f52f6250
--- /dev/null
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/RegionStart.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.network.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+public final class RegionStart extends RequestMessage {
+
+  // 0 for master, 1 for slave, see PartitionLocation.Mode
+  public final byte mode;
+
+  public final String shuffleKey;
+  public final String partitionUniqueId;
+  public final int attemptId;
+  public int currentRegionIndex;
+  public Boolean isBroadcast;
+
+  public RegionStart(
+      byte mode,
+      String shuffleKey,
+      String partitionUniqueId,
+      int attemptId,
+      int currentRegionIndex,
+      Boolean isBroadcast) {
+    this.mode = mode;
+    this.shuffleKey = shuffleKey;
+    this.partitionUniqueId = partitionUniqueId;
+    this.attemptId = attemptId;
+    this.currentRegionIndex = currentRegionIndex;
+    this.isBroadcast = isBroadcast;
+  }
+
+  @Override
+  public Type type() {
+    return Type.REGION_START;
+  }
+
+  @Override
+  public int encodedLength() {
+    return 1
+        + Encoders.Strings.encodedLength(shuffleKey)
+        + Encoders.Strings.encodedLength(partitionUniqueId)
+        + 4
+        + 4
+        + 1;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    buf.writeByte(mode);
+    Encoders.Strings.encode(buf, shuffleKey);
+    Encoders.Strings.encode(buf, partitionUniqueId);
+    buf.writeInt(attemptId);
+    buf.writeInt(currentRegionIndex);
+    buf.writeBoolean(isBroadcast);
+  }
+
+  public static RegionStart decode(ByteBuf buf) {
+    byte mode = buf.readByte();
+    String shuffleKey = Encoders.Strings.decode(buf);
+    String partitionUniqueId = Encoders.Strings.decode(buf);
+    int attemptId = buf.readInt();
+    int currentRegionIndex = buf.readInt();
+    boolean isBroadCast = buf.readBoolean();
+    return new RegionStart(
+        mode, shuffleKey, partitionUniqueId, attemptId, currentRegionIndex, isBroadCast);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(
+        mode, shuffleKey, partitionUniqueId, attemptId, currentRegionIndex, isBroadcast);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof RegionStart) {
+      RegionStart o = (RegionStart) other;
+      return mode == o.mode
+          && shuffleKey.equals(o.shuffleKey)
+          && partitionUniqueId.equals((o.partitionUniqueId))
+          && attemptId == o.attemptId
+          && currentRegionIndex == o.currentRegionIndex
+          && isBroadcast == o.isBroadcast
+          && super.equals(o);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+        .add("mode", mode)
+        .add("shuffleKey", shuffleKey)
+        .add("partitionUniqueId", partitionUniqueId)
+        .add("attemptId", attemptId)
+        .add("currentRegionIndex", currentRegionIndex)
+        .add("isBroadcast", isBroadcast)
+        .toString();
+  }
+}