You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Abacn (via GitHub)" <gi...@apache.org> on 2023/04/13 22:01:48 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #26267: Coalesce sources until compressed serialized bundles under API limit

Abacn commented on code in PR #26267:
URL: https://github.com/apache/beam/pull/26267#discussion_r1166071451


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java:
##########
@@ -247,18 +247,37 @@ private static <T> SourceOperationResponse performSplitTyped(
           serializedSize);
     }
 
+    List<BoundedSource<T>> bundlesBeforeCoalesce = bundles;
     int numBundlesBeforeRebundling = bundles.size();
     // To further reduce size of the response and service-side memory usage, coalesce
     // the sources into numBundlesLimit compressed serialized bundles.
-    if (bundles.size() > numBundlesLimit) {
+    while (serializedSize > apiByteLimit || bundles.size() > numBundlesLimit) {
+      // bundle size constrained by API limit, adds 5% allowance
+      int targetBundleSizeApiLimit = (int) (bundles.size() * apiByteLimit / serializedSize * 0.95);
+      // bundle size constrained by numBundlesLimit
+      int targetBundleSizeBundleLimit = Math.min(numBundlesLimit, bundles.size() - 1);
+      int targetBundleSize = Math.min(targetBundleSizeApiLimit, targetBundleSizeBundleLimit);
+
+      if (targetBundleSize <= 1) {
+        String message =
+            String.format(
+                "Unable to coalesce the sources into compressed serialized bundles to satisfy the "
+                    + "allowable limit when splitting %s. With %d bundles, total serialized size "
+                    + "of %d bytes is still larger than the limit %d. For more information, please "
+                    + "check the corresponding FAQ entry at "
+                    + "https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline",
+                source, bundles.size(), serializedSize, apiByteLimit);

Review Comment:
   This option is BigQueryIO.read only while the bug is generic to dataflow worker. With that option the read teansform will expand differently and likely avoided the problematic code path here (haven't verify)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org