You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/30 02:08:04 UTC
[GitHub] [beam] nbali opened a new pull request, #22953: Fix for #22951
nbali opened a new pull request, #22953:
URL: https://github.com/apache/beam/pull/22953
Closes #22951
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1319554491
Waiting for https://github.com/apache/beam/issues/20819
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332838200
Run Dataflow Streaming ValidatesRunner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1243711217
@lukecwik Can I use the same PipelineOptions there as well? Does it use the same network layer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1024072638
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -424,13 +465,40 @@ public void processElement(
BoundedWindow window,
OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
LOG.debug("*** BATCH *** Add element for window {} ", window);
+ if (shouldCareAboutWeight()) {
+ final long elementWeight = weigher.apply(element.getValue());
+ if (elementWeight + storedBatchSizeBytes.read() > batchSizeBytes) {
Review Comment:
TBH I wasn't sure how `read()`/`readLater()` implementation works. Like if we read it once, then will it be cached for the whole duration already or will it fetch it again.. but I assumed `readLater()` - as every prefetching method should be - is already optimized to be noop for already present values, so worst case is that we have an unnecessary noop call.
So to sum things up does that mean that a value returned by a `read()` call will be always available from that point forward? Anyway modified the PR/code to reflect this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r963987206
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+ // If user triggering is supplied, we will trigger the file write after this many bytes are
+ // written.
+ static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Review Comment:
@lukecwik Having the same limit as a buffer actually makes sense to me, but can you direct me towards where might I find that limit? I can see it in the comments for `DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE`, but instead of hardcoding 64MB here as well, I would rather reference the original limit directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1232364060
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1318024721
I'm not sure why [org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInGlobalWindowBatchSizeByteSizeFn](https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/24830/testReport/junit/org.apache.beam.sdk.transforms/GroupIntoBatchesTest/testInGlobalWindowBatchSizeByteSizeFn/) fails. It seems to pass locally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1319088989
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1232341277
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1335572882
https://github.com/apache/beam/pull/24463 containing this plus a fix was merged.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231059043
R: @lukecwik
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964355971
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -675,9 +674,26 @@ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByte
.stream()
.map(s -> KV.of("key", s))
.collect(Collectors.toList());
+
+ // to ensure ordered firing
+ TestStream.Builder<KV<String, String>> streamBuilder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(Instant.EPOCH);
+
+ long offset = 0L;
+ for (KV<String, String> kv : dataToUse) {
+ streamBuilder =
+ streamBuilder.addElements(
+ TimestampedValue.of(kv, Instant.EPOCH.plus(Duration.standardSeconds(offset))));
+ offset++;
+ }
+
+ // fire them all at once
+ TestStream<KV<String, String>> stream = streamBuilder.advanceWatermarkToInfinity();
Review Comment:
@lukecwik is there any other simpler way to guarantee the order of the elements that I missed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] codecov[bot] commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1238574039
# [Codecov](https://codecov.io/gh/apache/beam/pull/22953?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#22953](https://codecov.io/gh/apache/beam/pull/22953?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b2a6f46) into [master](https://codecov.io/gh/apache/beam/commit/a60105abb50dca4dc8bf05a713c3cb099f88ec0c?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a60105a) will **decrease** coverage by `0.05%`.
> The diff coverage is `60.18%`.
> :exclamation: Current head b2a6f46 differs from pull request most recent head 5f79ed6. Consider uploading reports for the commit 5f79ed6 to get more accurate results
```diff
@@ Coverage Diff @@
## master #22953 +/- ##
==========================================
- Coverage 73.68% 73.63% -0.06%
==========================================
Files 713 716 +3
Lines 94988 95197 +209
==========================================
+ Hits 69992 70098 +106
- Misses 23695 23798 +103
Partials 1301 1301
```
| Flag | Coverage Δ | |
|---|---|---|
| python | `83.42% <70.20%> (-0.09%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/beam/pull/22953?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [sdks/go/pkg/beam/core/runtime/harness/harness.go](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvaGFybmVzcy9oYXJuZXNzLmdv) | `10.18% <0.00%> (ø)` | |
| [sdks/go/pkg/beam/io/databaseio/database.go](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9kYXRhYmFzZWlvL2RhdGFiYXNlLmdv) | `24.24% <0.00%> (ø)` | |
| [sdks/go/pkg/beam/io/databaseio/writer.go](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9kYXRhYmFzZWlvL3dyaXRlci5nbw==) | `0.00% <0.00%> (ø)` | |
| [...examples/inference/pytorch\_image\_classification.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfaW1hZ2VfY2xhc3NpZmljYXRpb24ucHk=) | `0.00% <0.00%> (ø)` | |
| [...am/examples/inference/pytorch\_language\_modeling.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvaW5mZXJlbmNlL3B5dG9yY2hfbGFuZ3VhZ2VfbW9kZWxpbmcucHk=) | `0.00% <0.00%> (ø)` | |
| [...thon/apache\_beam/ml/inference/pytorch\_inference.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL3B5dG9yY2hfaW5mZXJlbmNlLnB5) | `0.00% <0.00%> (ø)` | |
| [...thon/apache\_beam/runners/worker/sdk\_worker\_main.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlcl9tYWluLnB5) | `78.48% <ø> (ø)` | |
| [...ference/pytorch\_image\_classification\_benchmarks.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL2luZmVyZW5jZS9weXRvcmNoX2ltYWdlX2NsYXNzaWZpY2F0aW9uX2JlbmNobWFya3MucHk=) | `0.00% <0.00%> (ø)` | |
| [.../inference/pytorch\_language\_modeling\_benchmarks.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9iZW5jaG1hcmtzL2luZmVyZW5jZS9weXRvcmNoX2xhbmd1YWdlX21vZGVsaW5nX2JlbmNobWFya3MucHk=) | `0.00% <0.00%> (ø)` | |
| [sdks/python/apache\_beam/typehints/schemas.py](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3NjaGVtYXMucHk=) | `93.84% <ø> (ø)` | |
| ... and [35 more](https://codecov.io/gh/apache/beam/pull/22953/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
:mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1315425812
> > @lukecwik Can I use the same PipelineOptions there (in WriteFiles) as well? Does it use the same network layer?
>
> Sort of, the issue is that the person might be writing to a different file system that isn't GCS. If you had a way to check the filesystem then you could apply the GCS limit. On the other hand it might make sense to use it anyways.
The class wasn't even available as a dependency - for a good reason -, so I just hardcoded 64MB there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] reuvenlax commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
reuvenlax commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1024043582
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -424,13 +465,40 @@ public void processElement(
BoundedWindow window,
OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
LOG.debug("*** BATCH *** Add element for window {} ", window);
+ if (shouldCareAboutWeight()) {
+ final long elementWeight = weigher.apply(element.getValue());
+ if (elementWeight + storedBatchSizeBytes.read() > batchSizeBytes) {
Review Comment:
this defeats the readLater optimization, since you're eagerly reading the value here (meaning also there's no point in the below readLater). you should add readLaters for minBufferedTs (if needed) and storedBatchSize earlier in the function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1242165446
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1241426102
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1234734319
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1022833261
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -267,20 +225,9 @@ public void testWithShardedKeyInGlobalWindow() {
PAssert.that("Incorrect batch size in one or more elements", collection)
Review Comment:
Erhm, isn't this comment only valid for `.withShardedKey()`?
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
pipeline.run().waitUntilFinish();
}
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesStatefulParDo.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+ // with using only one of the limits the result would be only 2 batches,
+ // if we have 3 both limit works
+ List<KV<String, String>> dataToUse =
+ Lists.newArrayList(
+ "a-1",
+ "a-2",
+ "a-3" + Strings.repeat("-", 100),
+ // byte size limit reached (BATCH_SIZE_BYTES = 25)
+ "b-4",
+ "b-5",
+ "b-6",
+ "b-7",
+ "b-8",
+ // count limit reached (BATCH_SIZE = 5)
+ "c-9")
+ .stream()
+ .map(s -> KV.of("key", s))
+ .collect(Collectors.toList());
+
+ // to ensure ordered firing
+ TestStream.Builder<KV<String, String>> streamBuilder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(Instant.EPOCH);
+
+ long offset = 0L;
+ for (KV<String, String> kv : dataToUse) {
Review Comment:
Won't the the different/increasing timestamps already guarantee that?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+ // If user triggering is supplied, we will trigger the file write after this many bytes are
+ // written.
+ static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Review Comment:
I used a different algo, but IMO it stays closed to the original concept of the transform now.
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -116,16 +123,6 @@ public void testInGlobalWindowBatchSizeCount() {
PAssert.that("Incorrect batch size in one or more elements", collection)
.satisfies(
new SerializableFunction<Iterable<KV<String, Iterable<String>>>, Void>() {
-
- private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
- for (KV<String, Iterable<String>> element : listToCheck) {
- if (Iterables.size(element.getValue()) != BATCH_SIZE) {
Review Comment:
Actually I think I did solved that. I mean apart from the inaccuracy of the weigher.
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
pipeline.run().waitUntilFinish();
}
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesStatefulParDo.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+ // with using only one of the limits the result would be only 2 batches,
+ // if we have 3 both limit works
+ List<KV<String, String>> dataToUse =
+ Lists.newArrayList(
+ "a-1",
+ "a-2",
+ "a-3" + Strings.repeat("-", 100),
+ // byte size limit reached (BATCH_SIZE_BYTES = 25)
+ "b-4",
+ "b-5",
+ "b-6",
+ "b-7",
+ "b-8",
+ // count limit reached (BATCH_SIZE = 5)
+ "c-9")
+ .stream()
+ .map(s -> KV.of("key", s))
+ .collect(Collectors.toList());
+
+ // to ensure ordered firing
Review Comment:
done in [`6abe4cd` (#22953)](https://github.com/apache/beam/pull/22953/commits/6abe4cd7b17698fa1e1ab4870e6cb805feed0b10)
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
pipeline.run().waitUntilFinish();
}
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesStatefulParDo.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+ // with using only one of the limits the result would be only 2 batches,
+ // if we have 3 both limit works
Review Comment:
done in [`6abe4cd` (#22953)](https://github.com/apache/beam/pull/22953/commits/6abe4cd7b17698fa1e1ab4870e6cb805feed0b10)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1036510196
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -117,6 +117,11 @@
*/
@AutoValue
public abstract static class BatchingParams<InputT> implements Serializable {
+ public static <InputT> BatchingParams<InputT> createDefault() {
Review Comment:
```suggestion
private static <InputT> BatchingParams<InputT> createDefault() {
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -155,90 +151,68 @@ public void testInGlobalWindowBatchSizeByteSize() {
PAssert.that("Incorrect batch size in one or more elements", collection)
.satisfies(
new SerializableFunction<Iterable<KV<String, Iterable<String>>>, Void>() {
-
- private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
- for (KV<String, Iterable<String>> element : listToCheck) {
- long byteSize = 0;
- for (String str : element.getValue()) {
- if (byteSize >= BATCH_SIZE_BYTES) {
- // We already reached the batch size, so extra elements are not expected.
- return false;
- }
- try {
- byteSize += StringUtf8Coder.of().getEncodedElementByteSize(str);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
- return true;
- }
-
@Override
public Void apply(Iterable<KV<String, Iterable<String>>> input) {
- assertTrue(checkBatchSizes(input));
+ assertTrue(checkBatchByteSizes(input));
return null;
}
});
PAssert.thatSingleton("Incorrect collection size", collection.apply("Count", Count.globally()))
- .isEqualTo(3L);
+ .isEqualTo(4L);
pipeline.run();
}
@Test
@Category({
ValidatesRunner.class,
NeedsRunner.class,
+ UsesTestStream.class,
UsesTimersInParDo.class,
UsesStatefulParDo.class,
UsesOnWindowExpiration.class
})
public void testInGlobalWindowBatchSizeByteSizeFn() {
+ SerializableFunction<String, Long> getElementByteSizeFn =
+ s -> {
+ try {
+ return 2 * StringUtf8Coder.of().getEncodedElementByteSize(s);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ // to ensure ordered processing
+ TestStream.Builder<KV<String, String>> streamBuilder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(Instant.EPOCH);
Review Comment:
```suggestion
TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332838500
Run Java Dataflow V2 ValidatesRunner Streaming
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231791579
irrelevant test failure with "Java Tests / Java Wordcount Direct Runner (windows-latest) (pull_request)"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964255247
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -116,16 +123,6 @@ public void testInGlobalWindowBatchSizeCount() {
PAssert.that("Incorrect batch size in one or more elements", collection)
.satisfies(
new SerializableFunction<Iterable<KV<String, Iterable<String>>>, Void>() {
-
- private boolean checkBatchSizes(Iterable<KV<String, Iterable<String>>> listToCheck) {
- for (KV<String, Iterable<String>> element : listToCheck) {
- if (Iterables.size(element.getValue()) != BATCH_SIZE) {
Review Comment:
I did notice that it's `!=` and not `>` here, but the test is still valid with `>` (we have 10 elements, and 5 batch size, so it can't be anything but 5, and we check the batch count at the end with `EVEN_NUM_ELEMENTS / BATCH_SIZE`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1240117166
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231060002
fyi, most than likely it requires spotless, used github.dev for it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r976932415
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+ // If user triggering is supplied, we will trigger the file write after this many bytes are
+ // written.
+ static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Review Comment:
@lukecwik
On second thought, I think there is a problem with using this 64MB default. We only flush the batch inside `GroupIntoBatches`, once the `storedBatchSizeBytes` is greater than or equal to the limit. So if we make the limit 64MB, more than likely we will flush just a bit more than 64MB so we won't fit into the 64MB buffer.
So either the triggering byte count should be x% smaller than the 64MB default, or `GroupIntoBatches` has to be modified that it the current element would make it go over the byte size limit, then fire the batch without that element being added to the it first. The second seems like a better solution, but I assume doing the `storedBatchSizeBytes.read()` sooner would have performance impact.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964272495
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -117,6 +117,11 @@
*/
@AutoValue
public abstract static class BatchingParams<InputT> implements Serializable {
Review Comment:
see [`040b744` (#22953)](https://github.com/apache/beam/pull/22953/commits/040b744454a6985ca2bb1c42e30fe90fabd42a59) and [`b1b732c` (#22953)](https://github.com/apache/beam/pull/22953/commits/b1b732c60f2dbeebfddc50afa7a946f2ed0864ca)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964355971
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -675,9 +674,26 @@ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByte
.stream()
.map(s -> KV.of("key", s))
.collect(Collectors.toList());
+
+ // to ensure ordered firing
+ TestStream.Builder<KV<String, String>> streamBuilder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(Instant.EPOCH);
+
+ long offset = 0L;
+ for (KV<String, String> kv : dataToUse) {
+ streamBuilder =
+ streamBuilder.addElements(
+ TimestampedValue.of(kv, Instant.EPOCH.plus(Duration.standardSeconds(offset))));
+ offset++;
+ }
+
+ // fire them all at once
+ TestStream<KV<String, String>> stream = streamBuilder.advanceWatermarkToInfinity();
Review Comment:
@lukecwik is there any other (so not TestStream+TimestampedValue) simpler way to guarantee the order of the elements that I missed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1331086230
Run Java_Examples_Dataflow PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r961075566
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java:
##########
@@ -117,6 +117,11 @@
*/
@AutoValue
public abstract static class BatchingParams<InputT> implements Serializable {
Review Comment:
It looks like your adding support for GroupIntoBatches to limit on count and byte size at the same time.
Can you add tests that cover this new scenario to:
* GroupIntoBatchesTest
* GroupIntoBatchesTranslationTest
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+ // If user triggering is supplied, we will trigger the file write after this many bytes are
+ // written.
+ static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Review Comment:
It looks like we already have a memory limit for writing of 20 parallel writers with 64mb buffers. Should we limit this triggering to be 64mbs as well so that it fits in one chunk?
CC: @reuvenlax Any suggestions here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1329906355
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1334465932
Note that I cloned this PR and [added this patch](https://github.com/apache/beam/pull/24463/commits/ee9cd46570a57ec012edbd40edec868307dce57d) and opened up [a new PR](https://github.com/apache/beam/pull/24463). If the Dataflow tests there pass I intend to merge it and close this one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik closed pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik closed pull request #22953: Fix for #22951
URL: https://github.com/apache/beam/pull/22953
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1036500319
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
pipeline.run().waitUntilFinish();
}
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesStatefulParDo.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+ // with using only one of the limits the result would be only 2 batches,
+ // if we have 3 both limit works
+ List<KV<String, String>> dataToUse =
+ Lists.newArrayList(
+ "a-1",
+ "a-2",
+ "a-3" + Strings.repeat("-", 100),
+ // byte size limit reached (BATCH_SIZE_BYTES = 25)
+ "b-4",
+ "b-5",
+ "b-6",
+ "b-7",
+ "b-8",
+ // count limit reached (BATCH_SIZE = 5)
+ "c-9")
+ .stream()
+ .map(s -> KV.of("key", s))
+ .collect(Collectors.toList());
+
+ // to ensure ordered firing
+ TestStream.Builder<KV<String, String>> streamBuilder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(Instant.EPOCH);
+
+ long offset = 0L;
+ for (KV<String, String> kv : dataToUse) {
Review Comment:
No, elements can be processed in parallel in any arbitrary order. timestamps are used with windowing which can hold/buffer inputs and produce outputs but that isn't the case with test stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1333161247
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1243708064
Note to self: the same happens with https://github.com/a0x8o/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1231059806
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1324607663
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1334046715
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r1036500319
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java:
##########
@@ -709,4 +645,136 @@ public void processElement(ProcessContext c, BoundedWindow window) {
pipeline.run().waitUntilFinish();
}
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesStatefulParDo.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testMultipleLimitsAtOnceInGlobalWindowBatchSizeCountAndBatchSizeByteSize() {
+ // with using only one of the limits the result would be only 2 batches,
+ // if we have 3 both limit works
+ List<KV<String, String>> dataToUse =
+ Lists.newArrayList(
+ "a-1",
+ "a-2",
+ "a-3" + Strings.repeat("-", 100),
+ // byte size limit reached (BATCH_SIZE_BYTES = 25)
+ "b-4",
+ "b-5",
+ "b-6",
+ "b-7",
+ "b-8",
+ // count limit reached (BATCH_SIZE = 5)
+ "c-9")
+ .stream()
+ .map(s -> KV.of("key", s))
+ .collect(Collectors.toList());
+
+ // to ensure ordered firing
+ TestStream.Builder<KV<String, String>> streamBuilder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(Instant.EPOCH);
+
+ long offset = 0L;
+ for (KV<String, String> kv : dataToUse) {
Review Comment:
Your right, each addElements call is its own batch that need to be processed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332838348
Run Java Dataflow V2 ValidatesRunner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1332837925
Run Dataflow ValidatesRunner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on PR #22953:
URL: https://github.com/apache/beam/pull/22953#issuecomment-1317942990
Run Java_GCP_IO_Direct PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] lukecwik commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964007253
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+ // If user triggering is supplied, we will trigger the file write after this many bytes are
+ // written.
+ static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Review Comment:
The default comes from this constant:
https://www.javadoc.io/static/com.google.cloud.bigdataoss/util/1.9.17/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.html#UPLOAD_CHUNK_SIZE_DEFAULT
The user can override the default using this pipeline option: https://github.com/apache/beam/blob/b2a6f46fb21709cc1927ce1950a38a922dce0a35/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java#L91
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] nbali commented on a diff in pull request #22953: Fix for #22951
Posted by GitBox <gi...@apache.org>.
nbali commented on code in PR #22953:
URL: https://github.com/apache/beam/pull/22953#discussion_r964180547
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -113,6 +113,9 @@
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+ // If user triggering is supplied, we will trigger the file write after this many bytes are
+ // written.
+ static final long FILE_TRIGGERING_BYTE_COUNT = 100 * (1L << 20); // 100MiB
Review Comment:
done in [`bcd4ba9` (#22953)](https://github.com/apache/beam/pull/22953/commits/bcd4ba912a224aaba31be71b5eade04a2219de09)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org