You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by em...@apache.org on 2022/04/12 19:46:43 UTC

[beam] branch master updated: [BEAM-14116] Rollback "Chunk commit requests dynamically (#17004)" (#17352)

This is an automated email from the ASF dual-hosted git repository.

emilyye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 390fe994d19 [BEAM-14116] Rollback "Chunk commit requests dynamically (#17004)" (#17352)
390fe994d19 is described below

commit 390fe994d191357f4651645d033cc07ad6acd826
Author: emily <em...@google.com>
AuthorDate: Tue Apr 12 12:46:33 2022 -0700

    [BEAM-14116] Rollback "Chunk commit requests dynamically (#17004)" (#17352)
---
 .../worker/windmill/GrpcWindmillServer.java        | 105 ++++++---------------
 1 file changed, 29 insertions(+), 76 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index e914ef160de..3f418ca413c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.worker.windmill;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.SequenceInputStream;
 import java.net.URI;
@@ -120,8 +119,8 @@ import org.slf4j.LoggerFactory;
 public class GrpcWindmillServer extends WindmillServerStub {
   private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServer.class);
 
-  // If a connection cannot be established, gRPC will fail fast so this deadline can be
-  // relatively high.
+  // If a connection cannot be established, gRPC will fail fast so this deadline can be relatively
+  // high.
   private static final long DEFAULT_UNARY_RPC_DEADLINE_SECONDS = 300;
   private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
 
@@ -1471,82 +1470,36 @@ public class GrpcWindmillServer extends WindmillServerStub {
       }
     }
 
-    // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
-    // before calling the chunkWriter on each.
-    //
-    // This avoids materializing the whole serialized request in the case it is large.
-    private class ChunkingByteStream extends OutputStream {
-      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
-      private final Consumer<ByteString> chunkWriter;
-
-      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
-        this.chunkWriter = chunkWriter;
-      }
-
-      @Override
-      public void close() {
-        flushBytes();
-      }
-
-      @Override
-      public void write(int b) throws IOException {
-        output.write(b);
-        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
-          flushBytes();
-        }
-      }
-
-      @Override
-      public void write(byte b[], int currentOffset, int len) throws IOException {
-        final int endOffset = currentOffset + len;
-        while ((endOffset - currentOffset) + output.size() >= COMMIT_STREAM_CHUNK_SIZE) {
-          int writeSize = COMMIT_STREAM_CHUNK_SIZE - output.size();
-          output.write(b, currentOffset, writeSize);
-          currentOffset += writeSize;
-          flushBytes();
-        }
-        if (currentOffset != endOffset) {
-          output.write(b, currentOffset, endOffset - currentOffset);
-        }
-      }
-
-      private void flushBytes() {
-        if (output.size() == 0) {
-          return;
-        }
-        chunkWriter.accept(output.toByteString());
-        output.reset();
-      }
-    }
-
     private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
       Preconditions.checkNotNull(pendingRequest.computation);
-      Consumer<ByteString> chunkWriter =
-          new Consumer<ByteString>() {
-            private long remaining = pendingRequest.request.getSerializedSize();
+      final ByteString serializedCommit = pendingRequest.request.toByteString();
 
-            @Override
-            public void accept(ByteString chunk) {
-              StreamingCommitRequestChunk.Builder chunkBuilder =
-                  StreamingCommitRequestChunk.newBuilder()
-                      .setRequestId(id)
-                      .setSerializedWorkItemCommit(chunk)
-                      .setComputationId(pendingRequest.computation)
-                      .setShardingKey(pendingRequest.request.getShardingKey());
-              Preconditions.checkState(remaining >= chunk.size());
-              remaining -= chunk.size();
-              if (remaining > 0) {
-                chunkBuilder.setRemainingBytesForWorkItem(remaining);
-              }
-              StreamingCommitWorkRequest requestChunk =
-                  StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-              send(requestChunk);
-            }
-          };
-      try (ChunkingByteStream s = new ChunkingByteStream(chunkWriter)) {
-        pendingRequest.request.writeTo(s);
-      } catch (IllegalStateException | IOException e) {
-        LOG.info("Stream was broken, request will be retried when stream is reopened.", e);
+      synchronized (this) {
+        pending.put(id, pendingRequest);
+        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
+          int end = i + COMMIT_STREAM_CHUNK_SIZE;
+          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
+
+          StreamingCommitRequestChunk.Builder chunkBuilder =
+              StreamingCommitRequestChunk.newBuilder()
+                  .setRequestId(id)
+                  .setSerializedWorkItemCommit(chunk)
+                  .setComputationId(pendingRequest.computation)
+                  .setShardingKey(pendingRequest.request.getShardingKey());
+          int remaining = serializedCommit.size() - end;
+          if (remaining > 0) {
+            chunkBuilder.setRemainingBytesForWorkItem(remaining);
+          }
+
+          StreamingCommitWorkRequest requestChunk =
+              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
+          try {
+            send(requestChunk);
+          } catch (IllegalStateException e) {
+            // Stream was broken, request will be retried when stream is reopened.
+            break;
+          }
+        }
       }
     }
   }