You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/30 02:08:04 UTC

[GitHub] [beam] nbali opened a new pull request, #22953: Fix for #22951

nbali opened a new pull request, #22953:
URL: https://github.com/apache/beam/pull/22953

   Closes #22951
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1319554491

   Waiting for https://github.com/apache/beam/issues/20819


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332838200

   Run Dataflow Streaming ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1243711217

   @lukecwik Can I use the same PipelineOptions there as well? Does it use the same network layer?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1024072638


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -424,13 +465,40 @@ public void processElement(
         BoundedWindow window,
         OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
       LOG.debug("*** BATCH *** Add element for window {} ", window);
+      if (shouldCareAboutWeight()) {
+        final long elementWeight = weigher.apply(element.getValue());
+        if (elementWeight + storedBatchSizeBytes.read() > batchSizeBytes) {

Review Comment:
   TBH I wasn't sure how `read()`/`readLater()` implementation works. Like if we read it once, then will it be cached for the whole duration already or will it fetch it again.. but I assumed `readLater()` - as every prefetching method should be - is already optimized to be noop for already present values, so worst case is that we have an unnecessary  noop call.
   
   So to sum things up does that mean that a value returned by a `read()` call will be always available from that point forward? Anyway modified the PR/code to reflect this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r963987206


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
   // If user triggering is supplied, we will trigger the file write after this many records are
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+  // If user triggering is supplied, we will trigger the file write after this many bytes are
+  // written.
+  static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB

Review Comment:
   @lukecwik Having the same limit as a buffer actually makes sense to me, but can you direct me towards where might I find that limit? I can see it in the comments for `DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE`, but instead of hardcoding 64MB here as well, I would rather reference the original limit directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1232364060

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1318024721

   I'm not sure why [org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInGlobalWindowBatchSizeByteSizeFn](https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/24830/testReport/junit/org.apache.beam.sdk.transforms/GroupIntoBatchesTest/testInGlobalWindowBatchSizeByteSizeFn/) fails. It seems to pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1319088989

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1232341277

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1335572882

   https://github.com/apache/beam/pull/24463 containing this plus a fix was merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231059043

   R: @lukecwik


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964355971


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -675,9 +674,26 @@ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByte
             .stream()
             .map(s -> KV.of("key", s))
             .collect(Collectors.toList());
+
+    // to ensure ordered firing
+    TestStream.Builder<KV<String, String>> streamBuilder =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(Instant.EPOCH);
+
+    long offset = 0L;
+    for (KV<String, String> kv : dataToUse) {
+      streamBuilder =
+          streamBuilder.addElements(
+              TimestampedValue.of(kv, Instant.EPOCH.plus(Duration.standardSeconds(offset))));
+      offset++;
+    }
+
+    // fire them all at once
+    TestStream<KV<String, String>> stream = streamBuilder.advanceWatermarkToInfinity();

Review Comment:
   @lukecwik is there any other simpler way to guarantee the order of the elements that I missed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1238574039

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22953?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22953](https://codecov.io/gh/apache/beam/pull/22953?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b2a6f46) into [master](https://codecov.io/gh/apache/beam/commit/a60105abb50dca4dc8bf05a713c3cb099f88ec0c?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a60105a) will **decrease** coverage by `0.05%`.
   > The diff coverage is `60.18%`.
   
   > :exclamation: Current head b2a6f46 differs from pull request most recent head 5f79ed6. Consider uploading reports for the commit 5f79ed6 to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22953      +/-   ##
   ==========================================
   - Coverage   73.68%   73.63%   -0.06%     
   ==========================================
     Files         713      716       +3     
     Lines       94988    95197     +209     
   ==========================================
   + Hits        69992    70098     +106     
   - Misses      23695    23798     +103     
     Partials     1301     1301              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.42% <70.20%> (-0.09%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22953?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/runtime/harness/harness.go](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvaGFybmVzcy9oYXJuZXNzLmdv) | `10.18% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/io/databaseio/database.go](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9kYXRhYmFzZWlvL2RhdGFiYXNlLmdv) | `24.24% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/io/databaseio/writer.go](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9kYXRhYmFzZWlvL3dyaXRlci5nbw==) | `0.00% <0.00%> (ø)` | |
   | [...examples/inference/pytorch\_image\_classification.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfaW1hZ2VfY2xhc3NpZmljYXRpb24ucHk=) | `0.00% <0.00%> (ø)` | |
   | [...am/examples/inference/pytorch\_language\_modeling.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfbGFuZ3VhZ2VfbW9kZWxpbmcucHk=) | `0.00% <0.00%> (ø)` | |
   | [...thon/apache\_beam/ml/inference/pytorch\_inference.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL3B5dG9yY2hfaW5mZXJlbmNlLnB5) | `0.00% <0.00%> (ø)` | |
   | [...thon/apache\_beam/runners/worker/sdk\_worker\_main.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlcl9tYWluLnB5) | `78.48% <ø> (ø)` | |
   | [...ference/pytorch\_image\_classification\_benchmarks.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL2luZmVyZW5jZS9weXRvcmNoX2ltYWdlX2NsYXNzaWZpY2F0aW9uX2JlbmNobWFya3MucHk=) | `0.00% <0.00%> (ø)` | |
   | [.../inference/pytorch\_language\_modeling\_benchmarks.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL2luZmVyZW5jZS9weXRvcmNoX2xhbmd1YWdlX21vZGVsaW5nX2JlbmNobWFya3MucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/typehints/schemas.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3NjaGVtYXMucHk=) | `93.84% <ø> (ø)` | |
   | ... and [35 more](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1315425812

   > > @lukecwik Can I use the same PipelineOptions there (in WriteFiles) as well? Does it use the same network layer?
   > 
   > Sort of, the issue is that the person might be writing to a different file system that isn't GCS. If you had a way to check the filesystem then you could apply the GCS limit. On the other hand it might make sense to use it anyways.
   
   The class wasn't even available as a dependency - for a good reason -, so I just hardcoded 64MB there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1024043582


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -424,13 +465,40 @@ public void processElement(
         BoundedWindow window,
         OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
       LOG.debug("*** BATCH *** Add element for window {} ", window);
+      if (shouldCareAboutWeight()) {
+        final long elementWeight = weigher.apply(element.getValue());
+        if (elementWeight + storedBatchSizeBytes.read() > batchSizeBytes) {

Review Comment:
   this defeats the readLater optimization, since you're eagerly reading the value here (meaning also there's no point in the below readLater). you should add readLaters for minBufferedTs (if needed) and storedBatchSize earlier in the function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1242165446

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1241426102

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1234734319

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1022833261


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -267,20 +225,9 @@ public void testWithShardedKeyInGlobalWindow() {
     PAssert.that("Incorrect batch size in one or more elements", collection)

Review Comment:
   Erhm, isn't this comment only valid for `.withShardedKey()`?



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    NeedsRunner.class,
+    UsesTimersInParDo.class,
+    UsesTestStream.class,
+    UsesStatefulParDo.class,
+    UsesOnWindowExpiration.class
+  })
+  public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+    // with using only one of the limits the result would be only 2 batches,
+    // if we have 3 both limit works
+    List<KV<String, String>> dataToUse =
+        Lists.newArrayList(
+                "a-1",
+                "a-2",
+                "a-3" + Strings.repeat("-", 100),
+                // byte size limit reached (BATCH_SIZE_BYTES = 25)
+                "b-4",
+                "b-5",
+                "b-6",
+                "b-7",
+                "b-8",
+                // count limit reached (BATCH_SIZE = 5)
+                "c-9")
+            .stream()
+            .map(s -> KV.of("key", s))
+            .collect(Collectors.toList());
+
+    // to ensure ordered firing
+    TestStream.Builder<KV<String, String>> streamBuilder =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(Instant.EPOCH);
+
+    long offset = 0L;
+    for (KV<String, String> kv : dataToUse) {

Review Comment:
   Won't the the different/increasing timestamps already guarantee that? 



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
   // If user triggering is supplied, we will trigger the file write after this many records are
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+  // If user triggering is supplied, we will trigger the file write after this many bytes are
+  // written.
+  static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB

Review Comment:
    I used a different algo, but IMO it stays closed to the original concept of the transform now.
   



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -116,16 +123,6 @@ public void testInGlobalWindowBatchSizeCount() {
     PAssert.that("Incorrect batch size in one or more elements", collection)
         .satisfies(
             new SerializableFunction<Iterable<KV<String, Iterable<String>>>, Void>() {
-
-              private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
-                for (KV<String, Iterable<String>> element : listToCheck) {
-                  if (Iterables.size(element.getValue()) != BATCH_SIZE) {

Review Comment:
   Actually I think I did solved that. I mean apart from the inaccuracy of the weigher.



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    NeedsRunner.class,
+    UsesTimersInParDo.class,
+    UsesTestStream.class,
+    UsesStatefulParDo.class,
+    UsesOnWindowExpiration.class
+  })
+  public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+    // with using only one of the limits the result would be only 2 batches,
+    // if we have 3 both limit works
+    List<KV<String, String>> dataToUse =
+        Lists.newArrayList(
+                "a-1",
+                "a-2",
+                "a-3" + Strings.repeat("-", 100),
+                // byte size limit reached (BATCH_SIZE_BYTES = 25)
+                "b-4",
+                "b-5",
+                "b-6",
+                "b-7",
+                "b-8",
+                // count limit reached (BATCH_SIZE = 5)
+                "c-9")
+            .stream()
+            .map(s -> KV.of("key", s))
+            .collect(Collectors.toList());
+
+    // to ensure ordered firing

Review Comment:
   done in [`6abe4cd` (#22953)](https://github.com/apache/beam/pull/22953/commits/6abe4cd7b17698fa1e1ab4870e6cb805feed0b10)



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    NeedsRunner.class,
+    UsesTimersInParDo.class,
+    UsesTestStream.class,
+    UsesStatefulParDo.class,
+    UsesOnWindowExpiration.class
+  })
+  public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+    // with using only one of the limits the result would be only 2 batches,
+    // if we have 3 both limit works

Review Comment:
   done in [`6abe4cd` (#22953)](https://github.com/apache/beam/pull/22953/commits/6abe4cd7b17698fa1e1ab4870e6cb805feed0b10)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1036510196


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -117,6 +117,11 @@
    */
   @AutoValue
   public abstract static class BatchingParams<InputT> implements Serializable {
+    public static <InputT> BatchingParams<InputT> createDefault() {

Review Comment:
   ```suggestion
       private static <InputT> BatchingParams<InputT> createDefault() {
   ```



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -155,90 +151,68 @@ public void testInGlobalWindowBatchSizeByteSize() {
     PAssert.that("Incorrect batch size in one or more elements", collection)
         .satisfies(
             new SerializableFunction<Iterable<KV<String, Iterable<String>>>, Void>() {
-
-              private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
-                for (KV<String, Iterable<String>> element : listToCheck) {
-                  long byteSize = 0;
-                  for (String str : element.getValue()) {
-                    if (byteSize >= BATCH_SIZE_BYTES) {
-                      // We already reached the batch size, so extra elements are not expected.
-                      return false;
-                    }
-                    try {
-                      byteSize += StringUtf8Coder.of().getEncodedElementByteSize(str);
-                    } catch (Exception e) {
-                      throw new RuntimeException(e);
-                    }
-                  }
-                }
-                return true;
-              }
-
               @Override
               public Void apply(Iterable<KV<String, Iterable<String>>> input) {
-                assertTrue(checkBatchSizes(input));
+                assertTrue(checkBatchByteSizes(input));
                 return null;
               }
             });
     PAssert.thatSingleton("Incorrect collection size", collection.apply("Count", Count.globally()))
-        .isEqualTo(3L);
+        .isEqualTo(4L);
     pipeline.run();
   }
 
   @Test
   @Category({
     ValidatesRunner.class,
     NeedsRunner.class,
+    UsesTestStream.class,
     UsesTimersInParDo.class,
     UsesStatefulParDo.class,
     UsesOnWindowExpiration.class
   })
   public void testInGlobalWindowBatchSizeByteSizeFn() {
+    SerializableFunction<String, Long> getElementByteSizeFn =
+        s -> {
+          try {
+            return 2 * StringUtf8Coder.of().getEncodedElementByteSize(s);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        };
+
+    // to ensure ordered processing
+    TestStream.Builder<KV<String, String>> streamBuilder =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(Instant.EPOCH);

Review Comment:
   ```suggestion
           TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332838500

   Run Java Dataflow V2 ValidatesRunner Streaming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231791579

   irrelevant test failure with "Java Tests / Java Wordcount Direct Runner (windows-latest) (pull_request)"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964255247


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -116,16 +123,6 @@ public void testInGlobalWindowBatchSizeCount() {
     PAssert.that("Incorrect batch size in one or more elements", collection)
         .satisfies(
             new SerializableFunction<Iterable<KV<String, Iterable<String>>>, Void>() {
-
-              private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
-                for (KV<String, Iterable<String>> element : listToCheck) {
-                  if (Iterables.size(element.getValue()) != BATCH_SIZE) {

Review Comment:
   I did notice that it's  `!=` and not `>` here, but the test is still valid with `>` (we have 10 elements, and 5 batch size, so it can't be anything but 5, and we check the batch count at the end with `EVEN_NUM_ELEMENTS / BATCH_SIZE`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1240117166

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231060002

   fyi, most than likely it requires spotless, used github.dev for it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r976932415


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
   // If user triggering is supplied, we will trigger the file write after this many records are
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+  // If user triggering is supplied, we will trigger the file write after this many bytes are
+  // written.
+  static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB

Review Comment:
   @lukecwik 
   On second thought, I think there is a problem with using this 64MB default. We only flush the batch inside `GroupIntoBatches`, once the `storedBatchSizeBytes` is greater than or equal to the limit. So if we make the limit 64MB, more than likely we will flush just a bit more than 64MB so we won't fit into the 64MB buffer.
   
   So either the triggering byte count should be x% smaller than the 64MB default, or `GroupIntoBatches` has to be modified that it the current element would make it go over the byte size limit, then fire the batch without that element being added to the it first. The second seems like a better solution, but I assume doing the `storedBatchSizeBytes.read()` sooner would have performance impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964272495


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -117,6 +117,11 @@
    */
   @AutoValue
   public abstract static class BatchingParams<InputT> implements Serializable {

Review Comment:
   see [`040b744` (#22953)](https://github.com/apache/beam/pull/22953/commits/040b744454a6985ca2bb1c42e30fe90fabd42a59) and [`b1b732c` (#22953)](https://github.com/apache/beam/pull/22953/commits/b1b732c60f2dbeebfddc50afa7a946f2ed0864ca)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964355971


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -675,9 +674,26 @@ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByte
             .stream()
             .map(s -> KV.of("key", s))
             .collect(Collectors.toList());
+
+    // to ensure ordered firing
+    TestStream.Builder<KV<String, String>> streamBuilder =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(Instant.EPOCH);
+
+    long offset = 0L;
+    for (KV<String, String> kv : dataToUse) {
+      streamBuilder =
+          streamBuilder.addElements(
+              TimestampedValue.of(kv, Instant.EPOCH.plus(Duration.standardSeconds(offset))));
+      offset++;
+    }
+
+    // fire them all at once
+    TestStream<KV<String, String>> stream = streamBuilder.advanceWatermarkToInfinity();

Review Comment:
   @lukecwik is there any other (so not TestStream+TimestampedValue) simpler way to guarantee the order of the elements that I missed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1331086230

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r961075566


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -117,6 +117,11 @@
    */
   @AutoValue
   public abstract static class BatchingParams<InputT> implements Serializable {

Review Comment:
   It looks like your adding support for GroupIntoBatches to limit on count and byte size at the same time.
   
   Can you add tests that cover this new scenario to:
   * GroupIntoBatchesTest
   * GroupIntoBatchesTranslationTest
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
   // If user triggering is supplied, we will trigger the file write after this many records are
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+  // If user triggering is supplied, we will trigger the file write after this many bytes are
+  // written.
+  static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB

Review Comment:
   It looks like we already have a memory limit for writing of 20 parallel writers with 64mb buffers. Should we limit this triggering to be 64mbs as well so that it fits in one chunk?
   
   CC: @reuvenlax Any suggestions here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1329906355

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1334465932

   Note that I cloned this PR and [added this patch](https://github.com/apache/beam/pull/24463/commits/ee9cd46570a57ec012edbd40edec868307dce57d) and opened up [a new PR](https://github.com/apache/beam/pull/24463). If the Dataflow tests there pass I intend to merge it and close this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik closed pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik closed pull request #22953: Fix for #22951
URL: https://github.com/apache/beam/pull/22953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1036500319


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    NeedsRunner.class,
+    UsesTimersInParDo.class,
+    UsesTestStream.class,
+    UsesStatefulParDo.class,
+    UsesOnWindowExpiration.class
+  })
+  public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+    // with using only one of the limits the result would be only 2 batches,
+    // if we have 3 both limit works
+    List<KV<String, String>> dataToUse =
+        Lists.newArrayList(
+                "a-1",
+                "a-2",
+                "a-3" + Strings.repeat("-", 100),
+                // byte size limit reached (BATCH_SIZE_BYTES = 25)
+                "b-4",
+                "b-5",
+                "b-6",
+                "b-7",
+                "b-8",
+                // count limit reached (BATCH_SIZE = 5)
+                "c-9")
+            .stream()
+            .map(s -> KV.of("key", s))
+            .collect(Collectors.toList());
+
+    // to ensure ordered firing
+    TestStream.Builder<KV<String, String>> streamBuilder =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(Instant.EPOCH);
+
+    long offset = 0L;
+    for (KV<String, String> kv : dataToUse) {

Review Comment:
   No, elements can be processed in parallel in any arbitrary order. timestamps are used with windowing which can hold/buffer inputs and produce outputs but that isn't the case with test stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1333161247

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1243708064

   Note to self: the same happens with https://github.com/a0x8o/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231059806

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1324607663

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1334046715

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1036500319


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
 
     pipeline.run().waitUntilFinish();
   }
+
+  @Test
+  @Category({
+    ValidatesRunner.class,
+    NeedsRunner.class,
+    UsesTimersInParDo.class,
+    UsesTestStream.class,
+    UsesStatefulParDo.class,
+    UsesOnWindowExpiration.class
+  })
+  public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+    // with using only one of the limits the result would be only 2 batches,
+    // if we have 3 both limit works
+    List<KV<String, String>> dataToUse =
+        Lists.newArrayList(
+                "a-1",
+                "a-2",
+                "a-3" + Strings.repeat("-", 100),
+                // byte size limit reached (BATCH_SIZE_BYTES = 25)
+                "b-4",
+                "b-5",
+                "b-6",
+                "b-7",
+                "b-8",
+                // count limit reached (BATCH_SIZE = 5)
+                "c-9")
+            .stream()
+            .map(s -> KV.of("key", s))
+            .collect(Collectors.toList());
+
+    // to ensure ordered firing
+    TestStream.Builder<KV<String, String>> streamBuilder =
+        TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+            .advanceWatermarkTo(Instant.EPOCH);
+
+    long offset = 0L;
+    for (KV<String, String> kv : dataToUse) {

Review Comment:
   Your right, each addElements call is its own batch that need to be processed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332838348

   Run Java Dataflow V2 ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332837925

   Run Dataflow ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1317942990

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964007253


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
   // If user triggering is supplied, we will trigger the file write after this many records are
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+  // If user triggering is supplied, we will trigger the file write after this many bytes are
+  // written.
+  static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB

Review Comment:
   The default comes from this constant:
   https://www.javadoc.io/static/com.google.cloud.bigdataoss/util/1.9.17/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.html#UPLOAD_CHUNK_SIZE_DEFAULT
   
   The user can override the default using this pipeline option: https://github.com/apache/beam/blob/b2a6f46fb21709cc1927ce1950a38a922dce0a35/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java#L91
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951

Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964180547


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
   // If user triggering is supplied, we will trigger the file write after this many records are
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+  // If user triggering is supplied, we will trigger the file write after this many bytes are
+  // written.
+  static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB

Review Comment:
   done in [`bcd4ba9` (#22953)](https://github.com/apache/beam/pull/22953/commits/bcd4ba912a224aaba31be71b5eade04a2219de09)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org