You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/24 17:25:03 UTC

[GitHub] [spark] Victsm commented on a change in pull request #29855: [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support push shuffle blocks

Victsm commented on a change in pull request #29855:
URL: https://github.com/apache/spark/pull/29855#discussion_r494487660



##########
File path: common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
##########
@@ -44,6 +51,71 @@ public static String decode(ByteBuf buf) {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it
+      b.trim();
+      b.runOptimize();
+      return 4 + b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      ByteBuffer outBuffer = ByteBuffer.allocate(b.serializedSizeInBytes());
+      try {
+        b.serialize(new DataOutputStream(new OutputStream() {
+          ByteBuffer buffer;
+
+          OutputStream init(ByteBuffer buffer) {
+            this.buffer = buffer;
+            return this;
+          }
+
+          @Override
+          public void close() {
+          }
+
+          @Override
+          public void flush() {
+          }
+
+          @Override
+          public void write(int b) {
+            buffer.put((byte) b);
+          }
+
+          @Override
+          public void write(byte[] b) {
+            buffer.put(b);
+          }
+
+          @Override
+          public void write(byte[] b, int off, int l) {
+            buffer.put(b, off, l);
+          }
+        }.init(outBuffer)));
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while encoding bitmap", e);
+      }
+      byte[] bytes = outBuffer.array();
+      buf.writeInt(bytes.length);
+      buf.writeBytes(bytes);
+    }
+
+    public static RoaringBitmap decode(ByteBuf buf) {
+      int length = buf.readInt();
+      byte[] bytes = new byte[length];
+      buf.readBytes(bytes);

Review comment:
       This would require using ByteArrays.encode to encode the original byte arrays. I think @Ngone51 's recommendation earlier makes sense, that we should use roaringbitmap#serialize(ByteBuffer) to avoid the one additional memory copy during encoding. By doing that, we would directly serialize into the ByteBuf, and it won't be possible to use ByteArrays.encode to encode the corresponding byte arrays.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org