You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/24 07:00:23 UTC

[GitHub] [spark] mridulm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

mridulm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494003389



##########
File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {
+          ByteBuffer buffer;
+
+          OutputStream init(ByteBuffer buffer) {
+            this.buffer = buffer;
+            return this;
+          }
+
+          @Override
+          public void close() {
+          }
+
+          @Override
+          public void flush() {
+          }
+
+          @Override
+          public void write(int b) {
+            buffer.put((byte) b);
+          }
+
+          @Override
+          public void write(byte[] b) {
+            buffer.put(b);
+          }
+
+          @Override
+          public void write(byte[] b, int off, int l) {
+            buffer.put(b, off, l);
+          }
+        }.init(outBuffer)));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while encoding bitmap", e);
+      }

Review comment:
       Replace this with something more concise - for example see `UnsafeShuffleWriter.MyByteArrayOutputStream`.
   To illustrate, something like:
   ```
   MyBaos out = new MyBaos(b.serializedSizeInBytes());
   b.serialize(new DataOutputStream(out));
   int size = out.size();
   buf.writeInt(size);
   buf.writeBytes(out.getBuf(), 0, size);
   ```
   
   The last part could also be moved as `ByteArrays.encode(byte[] arr, int offset, int len)`

##########
File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();

Review comment:
       `BitmapArrays` results in calling `trim` and `runOptimize` twice - refactor so that it is only done once for this codepath ?

##########
File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -209,12 +225,17 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
         public void onComplete(String streamId) throws IOException {
            try {
              streamHandler.onComplete(streamId);
-             callback.onSuccess(ByteBuffer.allocate(0));
+             callback.onSuccess(meta.duplicate());

Review comment:
       Can you add a comment on why we are making this change ? From sending empty buffer to meta.

##########
File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -181,6 +182,17 @@ public void onFailure(Throwable e) {
   private void processStreamUpload(final UploadStream req) {
     assert (req.body() == null);
     try {
+      // Retain the original metadata buffer, since it will be used during the invocation of
+      // this method. Will be released later.
+      req.meta.retain();
+      // Make a copy of the original metadata buffer. In benchmark, we noticed that
+      // we cannot respond the original metadata buffer back to the client, otherwise
+      // in cases where multiple concurrent shuffles are present, a wrong metadata might
+      // be sent back to client. This is related to the eager release of the metadata buffer,
+      // i.e., we always release the original buffer by the time the invocation of this
+      // method ends, instead of by the time we respond it to the client. This is necessary,
+      // otherwise we start seeing memory issues very quickly in benchmarks.
+      ByteBuffer meta = cloneBuffer(req.meta.nioByteBuffer());

Review comment:
       Since we are always making a copy of meta here; can we remove the `retain` + `release` below and instead always release it here and only rely on the cloned butter within this method ?

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote shuffle service to
+ * be merged instead of for fetching them from remote shuffle services. This is used by
+ * ShuffleWriter when the block push process is initiated. The supplied BlockFetchingListener
+ * is used to handle the success or failure in pushing each blocks.
+ */
+public class OneForOneBlockPusher {
+  private static final Logger logger = LoggerFactory.getLogger(OneForOneBlockPusher.class);
+
+  private final TransportClient client;
+  private final String appId;
+  private final String[] blockIds;
+  private final BlockFetchingListener listener;
+  private final RpcResponseCallback callback;
+  private final Map<String, ManagedBuffer> buffers;
+
+  public OneForOneBlockPusher(
+      TransportClient client,
+      String appId,
+      String[] blockIds,
+      BlockFetchingListener listener,

Review comment:
       Instead of reusing `BlockFetchingListener`, add a new interface for push ?
   

##########
File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {
+          ByteBuffer buffer;
+
+          OutputStream init(ByteBuffer buffer) {
+            this.buffer = buffer;
+            return this;
+          }
+
+          @Override
+          public void close() {
+          }
+
+          @Override
+          public void flush() {
+          }
+
+          @Override
+          public void write(int b) {
+            buffer.put((byte) b);
+          }
+
+          @Override
+          public void write(byte[] b) {
+            buffer.put(b);
+          }
+
+          @Override
+          public void write(byte[] b, int off, int l) {
+            buffer.put(b, off, l);
+          }
+        }.init(outBuffer)));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while encoding bitmap", e);
+      }
+      byte[] bytes = outBuffer.array();
+      buf.writeInt(bytes.length);
+      buf.writeBytes(bytes);
+    }
+
+    public static RoaringBitmap decode(ByteBuf buf) {
+      int length = buf.readInt();
+      byte[] bytes = new byte[length];
+      buf.readBytes(bytes);

Review comment:
       Use `ByteArrays.decode` here ?

##########
File path: common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
##########
@@ -238,12 +259,26 @@ public String getID() {
       }
     } catch (Exception e) {
       logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
-      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
+      try {
+        // It's OK to respond the original metadata buffer here, because this is still inside
+        // the invocation of this method.
+        respond(new RpcFailure(req.requestId,
+            JavaUtils.encodeHeaderIntoErrorString(req.meta.nioByteBuffer(), e)));
+      } catch (IOException ioe) {
+        // No exception will be thrown here. req.meta.nioByteBuffer will not throw IOException
+        // because it's a NettyManagedBuffer. This try-catch block is to make compiler happy.
+        logger.error("Error in handling failure while invoking RpcHandler#receive() on RPC id {}",

Review comment:
       If this is not expected, `assert` on it instead ?

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+  boolean shouldRetryError(Throwable t);
+
+  default boolean shouldLogError(Throwable t) {
+    return true;
+  }
+
+  /**
+   * A no-op error handler instance.
+   */
+  ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+  /**
+   * The error handler for pushing shuffle blocks to remote shuffle services.
+   */
+  class BlockPushErrorHandler implements ErrorHandler {
+
+    @Override
+    public boolean shouldRetryError(Throwable t) {
+      // If it is a connection time out or a connection closed exception, no need to retry.
+      if (t.getCause() != null && t.getCause() instanceof ConnectException) {
+        return false;
+      }
+      // If the block is too late, there is no need to retry it
+      return (t.getMessage() == null || !t.getMessage()
+          .contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX)) && (t.getCause() == null
+          || t.getCause().getMessage() == null || !t.getCause().getMessage()
+          .contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX));
+    }
+
+    @Override
+    public boolean shouldLogError(Throwable t) {
+      return (t.getMessage() == null || (
+          !t.getMessage().contains(BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX)
+              && !t.getMessage().contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX))) && (
+          t.getCause() == null || t.getCause().getMessage() == null || (!t.getCause()
+              .getMessage()
+              .contains(BlockPushException.COULD_NOT_FIND_OPPORTUNITY_MSG_PREFIX) && !t.getCause()
+              .getMessage()
+              .contains(BlockPushException.TOO_LATE_MESSAGE_SUFFIX)));
+    }

Review comment:
       nit: Fix style/indentation in this class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org