You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by em...@apache.org on 2022/04/12 19:46:43 UTC
[beam] branch master updated: [BEAM-14116] Rollback "Chunk commit requests dynamically (#17004)" (#17352)
This is an automated email from the ASF dual-hosted git repository.
emilyye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 390fe994d19 [BEAM-14116] Rollback "Chunk commit requests dynamically (#17004)" (#17352)
390fe994d19 is described below
commit 390fe994d191357f4651645d033cc07ad6acd826
Author: emily <em...@google.com>
AuthorDate: Tue Apr 12 12:46:33 2022 -0700
[BEAM-14116] Rollback "Chunk commit requests dynamically (#17004)" (#17352)
---
.../worker/windmill/GrpcWindmillServer.java | 105 ++++++---------------
1 file changed, 29 insertions(+), 76 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index e914ef160de..3f418ca413c 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.dataflow.worker.windmill;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.SequenceInputStream;
import java.net.URI;
@@ -120,8 +119,8 @@ import org.slf4j.LoggerFactory;
public class GrpcWindmillServer extends WindmillServerStub {
private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServer.class);
- // If a connection cannot be established, gRPC will fail fast so this deadline can be
- // relatively high.
+ // If a connection cannot be established, gRPC will fail fast so this deadline can be relatively
+ // high.
private static final long DEFAULT_UNARY_RPC_DEADLINE_SECONDS = 300;
private static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
@@ -1471,82 +1470,36 @@ public class GrpcWindmillServer extends WindmillServerStub {
}
}
- // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
- // before calling the chunkWriter on each.
- //
- // This avoids materializing the whole serialized request in the case it is large.
- private class ChunkingByteStream extends OutputStream {
- private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
- private final Consumer<ByteString> chunkWriter;
-
- ChunkingByteStream(Consumer<ByteString> chunkWriter) {
- this.chunkWriter = chunkWriter;
- }
-
- @Override
- public void close() {
- flushBytes();
- }
-
- @Override
- public void write(int b) throws IOException {
- output.write(b);
- if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
- flushBytes();
- }
- }
-
- @Override
- public void write(byte b[], int currentOffset, int len) throws IOException {
- final int endOffset = currentOffset + len;
- while ((endOffset - currentOffset) + output.size() >= COMMIT_STREAM_CHUNK_SIZE) {
- int writeSize = COMMIT_STREAM_CHUNK_SIZE - output.size();
- output.write(b, currentOffset, writeSize);
- currentOffset += writeSize;
- flushBytes();
- }
- if (currentOffset != endOffset) {
- output.write(b, currentOffset, endOffset - currentOffset);
- }
- }
-
- private void flushBytes() {
- if (output.size() == 0) {
- return;
- }
- chunkWriter.accept(output.toByteString());
- output.reset();
- }
- }
-
private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
Preconditions.checkNotNull(pendingRequest.computation);
- Consumer<ByteString> chunkWriter =
- new Consumer<ByteString>() {
- private long remaining = pendingRequest.request.getSerializedSize();
+ final ByteString serializedCommit = pendingRequest.request.toByteString();
- @Override
- public void accept(ByteString chunk) {
- StreamingCommitRequestChunk.Builder chunkBuilder =
- StreamingCommitRequestChunk.newBuilder()
- .setRequestId(id)
- .setSerializedWorkItemCommit(chunk)
- .setComputationId(pendingRequest.computation)
- .setShardingKey(pendingRequest.request.getShardingKey());
- Preconditions.checkState(remaining >= chunk.size());
- remaining -= chunk.size();
- if (remaining > 0) {
- chunkBuilder.setRemainingBytesForWorkItem(remaining);
- }
- StreamingCommitWorkRequest requestChunk =
- StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
- send(requestChunk);
- }
- };
- try (ChunkingByteStream s = new ChunkingByteStream(chunkWriter)) {
- pendingRequest.request.writeTo(s);
- } catch (IllegalStateException | IOException e) {
- LOG.info("Stream was broken, request will be retried when stream is reopened.", e);
+ synchronized (this) {
+ pending.put(id, pendingRequest);
+ for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
+ int end = i + COMMIT_STREAM_CHUNK_SIZE;
+ ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
+
+ StreamingCommitRequestChunk.Builder chunkBuilder =
+ StreamingCommitRequestChunk.newBuilder()
+ .setRequestId(id)
+ .setSerializedWorkItemCommit(chunk)
+ .setComputationId(pendingRequest.computation)
+ .setShardingKey(pendingRequest.request.getShardingKey());
+ int remaining = serializedCommit.size() - end;
+ if (remaining > 0) {
+ chunkBuilder.setRemainingBytesForWorkItem(remaining);
+ }
+
+ StreamingCommitWorkRequest requestChunk =
+ StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
+ try {
+ send(requestChunk);
+ } catch (IllegalStateException e) {
+ // Stream was broken, request will be retried when stream is reopened.
+ break;
+ }
+ }
}
}
}