You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/24 13:48:59 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

Ngone51 commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494290466



##########
File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {

Review comment:
       The `RoaringBitmap` also has `serialize(ByteBuffer buffer)` API. Why not use it?

##########
File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());

Review comment:
       Does this call `serializedSizeInBytes` assumes that `trim` & `runOptimize` already called? If so, please add comment to clarify it.

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push based shuffle.

Review comment:
       By looking at BlockPushCallback, seems like it's also possible that `PushBlockStream` sent from the remote shuffle service to the pusher? 

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
##########
@@ -43,6 +48,9 @@
  * (via BlockTransferService), which has the downside of losing the data if we lose the executors.
  */
 public class ExternalBlockStoreClient extends BlockStoreClient {
+  private static final Logger logger = LoggerFactory.getLogger(ExternalBlockStoreClient.class);

Review comment:
       The abstract class `BlockStoreClient` already initialized a `logger`. Why need a new one?

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.protocol.Encoders;
+
+
+/**
+ * Result returned by an ExternalShuffleService to a scheduler. This represents the result

Review comment:
       nit: `to a scheduler` -> `to the DAGScheduler`?

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+
+/**
+ * Request to notify external shuffle service to finalize merge for a given shuffle.

Review comment:
       Maybe "Request from DAGScheduler to"? 
   
   Since this is only a part of the implementation and no one sends it, I think it would be good for reviewers to know who's the sender.

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -156,6 +193,20 @@ protected void handleMessage(
       Map<String, String[]> localDirs = blockManager.getLocalDirs(msg.appId, msg.execIds);
       callback.onSuccess(new LocalDirsForExecutors(localDirs).toByteBuffer());
 
+    } else if (msgObj instanceof FinalizeShuffleMerge) {
+      final Timer.Context responseDelayContext =
+          metrics.finalizeShuffleMergeLatencyMillis.time();
+      FinalizeShuffleMerge msg = (FinalizeShuffleMerge) msgObj;
+      try {
+        checkAuth(client, msg.appId);
+        MergeStatuses statuses = mergeManager.finalizeShuffleMerge(msg);
+        callback.onSuccess(statuses.toByteBuffer());
+      } catch(IOException e) {
+        throw new RuntimeException(String.format("Error while finalizing shuffle merge "
+            + "for application %s shuffle %d", msg.appId, msg.shuffleId));

Review comment:
       I think we should respond to DAGScheduler with an error instead of throwing an exception here so that the finalizing future at DAGScheduler can complete quickly.

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push based shuffle.
+ */
+public class PushBlockStream extends BlockTransferMessage {
+  public final String appId;
+  public final String blockId;
+  public final int index;

Review comment:
       What does `index` mean here? Could you add some comments to explain?

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -373,6 +427,54 @@ public ManagedBuffer next() {
     }
   }
 
+  /**
+   * Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
+   * is not enabled.
+   */
+  private static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
+
+    @Override
+    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+    }
+
+    @Override
+    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
+      throw new UnsupportedOperationException("Cannot handle shuffle block merge");
+    }
+
+    @Override
+    public void registerApplication(String appId, String user) {

Review comment:
       Why we need `user` here? Is it a system user or something else?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org