You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/03 18:12:06 UTC

[GitHub] [beam] dpcollins-google opened a new pull request #17004: Chunk commit requests dynamically

dpcollins-google opened a new pull request #17004:
URL: https://github.com/apache/beam/pull/17004


   This ensures they do not materialize all data at once for large requests
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r819664805



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,80 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int off, int len) throws IOException {
+        // Fast path for larger writes that don't make the chunk too large.
+        if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) {
+          output.write(b, off, len);
+          return;
+        }
+        for (int i = 0; i < len; i++) {
+          write(b[off + i]);
+        }
+      }
+
+      private void flushBytes() {
+        if (output.size() == 0) {
+          return;
+        }
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      Consumer<ByteString> chunkWriter =
+          new Consumer<ByteString>() {
+            private long remaining = pendingRequest.request.getSerializedSize();
+
+            @Override
+            public void accept(ByteString byteString) {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #17004: [BEAM-14116] Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #17004:
URL: https://github.com/apache/beam/pull/17004#issuecomment-1069382495


   Is there a jira for this that we can tag?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #17004: [BEAM-14116] Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #17004:
URL: https://github.com/apache/beam/pull/17004


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r818955643



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
+        }
+      }
+
+      private void flushBytes() {
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      AtomicLong remaining = new AtomicLong(pendingRequest.request.getSerializedSize());

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on pull request #17004:
URL: https://github.com/apache/beam/pull/17004#issuecomment-1059260087


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #17004: [BEAM-14116] Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #17004:
URL: https://github.com/apache/beam/pull/17004#issuecomment-1078392305


   I'm open to cherry-picking in a forward fix since this was caught so soon after the cut. I just need an estimate of how long a fix would take to implement (I can wait a bit for this to get triaged so we can get an estimate). If a fix would take more than three days or so then probably better to just roll back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r820958813



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,82 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int currentOffset, int len) throws IOException {
+        final int endOffset = currentOffset + len;
+        while ((endOffset - currentOffset) + output.size() >= COMMIT_STREAM_CHUNK_SIZE) {
+          int writeSize = COMMIT_STREAM_CHUNK_SIZE - output.size();
+          output.write(b, currentOffset, writeSize);
+          currentOffset += writeSize;
+          flushBytes();
+        }
+        if (currentOffset != endOffset) {
+          output.write(b, currentOffset, endOffset - currentOffset);
+        }
+      }
+
+      private void flushBytes() {
+        if (output.size() == 0) {
+          return;
+        }
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      Consumer<ByteString> chunkWriter =
+          new Consumer<ByteString>() {
+            private long remaining = pendingRequest.request.getSerializedSize();
+
+            @Override
+            public void accept(ByteString chunk) {
+              StreamingCommitRequestChunk.Builder chunkBuilder =
+                  StreamingCommitRequestChunk.newBuilder()
+                      .setRequestId(id)
+                      .setSerializedWorkItemCommit(chunk)
+                      .setComputationId(pendingRequest.computation)
+                      .setShardingKey(pendingRequest.request.getShardingKey());
+              remaining -= chunk.size();

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r819664655



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,80 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int off, int len) throws IOException {
+        // Fast path for larger writes that don't make the chunk too large.
+        if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) {
+          output.write(b, off, len);
+          return;
+        }
+        for (int i = 0; i < len; i++) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
scwhittle commented on pull request #17004:
URL: https://github.com/apache/beam/pull/17004#issuecomment-1066977454


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google closed pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google closed pull request #17004:
URL: https://github.com/apache/beam/pull/17004


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r818942554



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {

Review comment:
       I see that it is recommended to override the array methods for performance:
   https://docs.oracle.com/javase/7/docs/api/java/io/OutputStream.html#write(byte[],%20int,%20int)

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {

Review comment:
       add comment with reason for this

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
+        }
+      }
+
+      private void flushBytes() {
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      AtomicLong remaining = new AtomicLong(pendingRequest.request.getSerializedSize());

Review comment:
       Don't think this needs to be atomic, everything is happening on this thread




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r820493181



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,82 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int currentOffset, int len) throws IOException {
+        final int endOffset = currentOffset + len;
+        while ((endOffset - currentOffset) + output.size() >= COMMIT_STREAM_CHUNK_SIZE) {
+          int writeSize = COMMIT_STREAM_CHUNK_SIZE - output.size();
+          output.write(b, currentOffset, writeSize);
+          currentOffset += writeSize;
+          flushBytes();
+        }
+        if (currentOffset != endOffset) {
+          output.write(b, currentOffset, endOffset - currentOffset);
+        }
+      }
+
+      private void flushBytes() {
+        if (output.size() == 0) {
+          return;
+        }
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      Consumer<ByteString> chunkWriter =
+          new Consumer<ByteString>() {
+            private long remaining = pendingRequest.request.getSerializedSize();
+
+            @Override
+            public void accept(ByteString chunk) {
+              StreamingCommitRequestChunk.Builder chunkBuilder =
+                  StreamingCommitRequestChunk.newBuilder()
+                      .setRequestId(id)
+                      .setSerializedWorkItemCommit(chunk)
+                      .setComputationId(pendingRequest.computation)
+                      .setShardingKey(pendingRequest.request.getShardingKey());
+              remaining -= chunk.size();

Review comment:
       assert remaining >= chunk.size()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r818968039



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,80 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int off, int len) throws IOException {
+        // Fast path for larger writes that don't make the chunk too large.
+        if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) {
+          output.write(b, off, len);
+          return;
+        }
+        for (int i = 0; i < len; i++) {
+          write(b[off + i]);
+        }
+      }
+
+      private void flushBytes() {
+        if (output.size() == 0) {
+          return;
+        }
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      Consumer<ByteString> chunkWriter =
+          new Consumer<ByteString>() {
+            private long remaining = pendingRequest.request.getSerializedSize();
+
+            @Override
+            public void accept(ByteString byteString) {

Review comment:
       seems like byteString should be chunk

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,80 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int off, int len) throws IOException {
+        // Fast path for larger writes that don't make the chunk too large.
+        if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) {
+          output.write(b, off, len);
+          return;
+        }
+        for (int i = 0; i < len; i++) {

Review comment:
       I think this could get called with large things, ie individual encoded message payloads (which can be up to 80MB). Can you
   - fill remaning in output and flush
   - while remaining bytes is greater than chunk size flush directly without copying to output
   - put the rest in output




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r818944404



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
+        }
+      }
+
+      private void flushBytes() {
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      AtomicLong remaining = new AtomicLong(pendingRequest.request.getSerializedSize());

Review comment:
       It needs to be atomic because everything captured by a closure needs to be effectively final... I could alternatively have an explicit anonymous class if you'd prefer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] scwhittle commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
scwhittle commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r818951642



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
+        }
+      }
+
+      private void flushBytes() {
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      AtomicLong remaining = new AtomicLong(pendingRequest.request.getSerializedSize());

Review comment:
       Ah. Yes, let's do that class so it isn't necessary for that reason. I think it will make the threading clearer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dpcollins-google commented on a change in pull request #17004: Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
dpcollins-google commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r818955871



##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,36 +1463,54 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = pendingRequest.request.toByteString();
+    private class ChunkingByteStream extends OutputStream {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #17004: [BEAM-14116] Chunk commit requests dynamically

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #17004:
URL: https://github.com/apache/beam/pull/17004#issuecomment-1078149645


   This change breaks the cloud/dataflow/testing/integration/sdk:streaming_GroupByKeyTest_BasicTests_testLargeKeys1MB_wm_service_local test reliably for Dataflow.
   
   @youngoli Resolution of whether this should be rolled back or forward fixed looks like will impact the 2.38 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org