You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/07 16:33:40 UTC

[GitHub] [beam] lukecwik commented on a diff in pull request #17802: [BEAM-14545] Optimize copies in dataflow v1 shuffle reader.

lukecwik commented on code in PR #17802:
URL: https://github.com/apache/beam/pull/17802#discussion_r891443402


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ByteArrayReader.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.nio.ByteBuffer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
+
+class ByteArrayReader {
+  private static final ByteBuffer EMPTY = ByteBuffer.allocate(0).asReadOnlyBuffer();
+
+  private final byte[] arr;
+  private int pos;
+
+  public ByteArrayReader(byte[] arr) {
+    this.arr = arr;
+    this.pos = 0;
+  }
+
+  public int available() {
+    return arr.length - pos;
+  }
+
+  public int readInt() {
+    int ret = Ints.fromBytes(arr[pos], arr[pos + 1], arr[pos + 2], arr[pos + 3]);
+    pos += 4;
+    return ret;
+  }
+
+  public ByteBuffer read(int size) {

Review Comment:
   It doesn't seem like we use the ByteBuffer anywhere in the code, should we just return ByteString instead?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ByteArrayShufflePosition.java:
##########
@@ -58,15 +65,15 @@ public static byte[] getPosition(@Nullable ShufflePosition shufflePosition) {
     }
     Preconditions.checkArgument(shufflePosition instanceof ByteArrayShufflePosition);
     ByteArrayShufflePosition adapter = (ByteArrayShufflePosition) shufflePosition;
-    return adapter.getPosition();
+    return adapter.getPosition().toByteArray();

Review Comment:
   Can we get rid of this method since toByteArray() does a copy?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestShuffleReader.java:
##########
@@ -47,10 +47,11 @@ public class TestShuffleReader implements ShuffleEntryReader {
         if (o2 == null) {
           return 1;
         }
-        return UnsignedBytes.lexicographicalComparator().compare(o1, o2);
+        return UnsignedBytes.lexicographicalComparator()

Review Comment:
   Why not use the lexicographical comparator that works on ByteStrings directly?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleEntry.java:
##########
@@ -26,18 +28,34 @@
 })
 public class ShuffleEntry {
   final ShufflePosition position;
-  final byte[] key;
-  final byte[] secondaryKey;
-  final byte[] value;
+  final ByteString key;
+  final ByteString secondaryKey;
+  final ByteString value;
 
-  public ShuffleEntry(byte[] key, byte[] secondaryKey, byte[] value) {
+  public ShuffleEntry(ByteString key, ByteString secondaryKey, ByteString value) {
     this.position = null;
     this.key = key;
     this.secondaryKey = secondaryKey;
     this.value = value;
   }
 
+  public ShuffleEntry(byte[] key, byte[] secondaryKey, byte[] value) {
+    this(
+        key == null ? null : ByteString.copyFrom(key),

Review Comment:
   Why not unsafe operations wrap?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleEntry.java:
##########
@@ -26,18 +28,34 @@
 })
 public class ShuffleEntry {
   final ShufflePosition position;
-  final byte[] key;
-  final byte[] secondaryKey;
-  final byte[] value;
+  final ByteString key;
+  final ByteString secondaryKey;
+  final ByteString value;
 
-  public ShuffleEntry(byte[] key, byte[] secondaryKey, byte[] value) {
+  public ShuffleEntry(ByteString key, ByteString secondaryKey, ByteString value) {
     this.position = null;
     this.key = key;
     this.secondaryKey = secondaryKey;
     this.value = value;
   }
 
+  public ShuffleEntry(byte[] key, byte[] secondaryKey, byte[] value) {
+    this(
+        key == null ? null : ByteString.copyFrom(key),
+        secondaryKey == null ? null : ByteString.copyFrom(secondaryKey),
+        value == null ? null : ByteString.copyFrom(value));
+  }
+
   public ShuffleEntry(ShufflePosition position, byte[] key, byte[] secondaryKey, byte[] value) {
+    this(
+        position,
+        ByteString.copyFrom(key),

Review Comment:
   Why not unsafe operations wrap?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java:
##########
@@ -17,18 +17,18 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import java.io.ByteArrayInputStream;
+import com.google.protobuf.ByteString;

Review Comment:
   The worker doesn't depend on protobuf directly.
   
   Its probably best if we use the vendored grpc import paths of protobuf
   
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PartitioningShuffleReader.java:
##########
@@ -144,9 +143,10 @@ public boolean advance() throws IOException {
         return false;
       }
       ShuffleEntry record = iterator.next();
-      K key = CoderUtils.decodeFromByteArray(shuffleReader.keyCoder, record.getKey());
+      K key = shuffleReader.keyCoder.decode(record.getKey().newInput(), Coder.Context.OUTER);

Review Comment:
   This check is that all the bytes are read is important as it catchers coder issues:
   https://github.com/apache/beam/blob/e62ae391985fc13c7df1ee6e088525835ceaa560/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L102
   
   Please re-add the check here and elsewhere (probably via a convenience method of some kind)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org