You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/05 07:43:35 UTC
[GitHub] [beam] ihji opened a new pull request, #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
ihji opened a new pull request, #17280:
URL: https://github.com/apache/beam/pull/17280
**Please** add a meaningful description for your change here
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] robertwb commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r844532488
##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) If this coder is set, overrides the output type of ExternalTransform
+ org.apache.beam.model.pipeline.v1.Coder output_coder_override = 4;
Review Comment:
This needs to be a map<tag, Coder> as a transform may have many outputs.
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -113,12 +115,12 @@ private static int getFreshNamespaceIndex() {
public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
return new MultiOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getNamespaceIndex());
+ getUrn(), getPayload(), getEndpoint(), getNamespaceIndex(), getOutputCoder());
Review Comment:
Not sure what this means. The coder should be used for all outputs? Likely we should disallow this to be non-null.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -431,10 +431,15 @@ def __init__(self, urn, payload, expansion_service=None):
self._payload = (
payload.payload() if isinstance(payload, PayloadBuilder) else payload)
self._expansion_service = expansion_service
+ self._output_coder = None
self._external_namespace = self._fresh_namespace()
self._inputs = {} # type: Dict[str, pvalue.PCollection]
self._outputs = {} # type: Dict[str, pvalue.PCollection]
+ def with_output_coder(self, output_coder):
Review Comment:
PTransforms already have a with_output_type. Let's grab that and (if present) infer the coder rather than adding a new method.
##########
sdks/python/apache_beam/runners/portability/expansion_service_test.py:
##########
@@ -270,6 +270,20 @@ def from_runner_api_parameter(unused_ptransform, payload, unused_context):
return PayloadTransform(payload.decode('ascii'))
+@ptransform.PTransform.register_urn('sum_without_type', None)
+class SumWithoutTypeTransform(ptransform.PTransform):
+ def expand(self, pcoll):
+ return pcoll | beam.CombineGlobally(sum)
Review Comment:
Can we never infer this? Better to use some type that we know we'll never be able to infer (e.g. due to a data dependency like "Map(lambda x: str_literal if x else int_literal)" that would technically be a union type but we could ensure is not).
##########
sdks/python/apache_beam/runners/portability/expansion_service.py:
##########
@@ -63,6 +64,10 @@ def with_pipeline(component, pcoll_id=None):
}
transform = with_pipeline(
ptransform.PTransform.from_runner_api(request.transform, context))
+ if request.output_coder_override.spec.urn:
+ output_coder = Coder.from_runner_api(
+ request.output_coder_override, context)
+ transform = transform.with_output_types(output_coder.to_type_hint())
Review Comment:
This could be unsafe. Should we at least make sure it's compatible? (This would have to happen after application...)
##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) If this coder is set, overrides the output type of ExternalTransform
Review Comment:
Let's let this be a request that the outputs have the given coders. This way the expansion service can do its best to satisfy the request, and if it can't the caller has one more chance to do its best to interpret the results.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852359639
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>, PCollection> output) {
public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
return new MultiOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+ getUrn(),
+ getPayload(),
+ getEndpoint(),
+ getClientFactory(),
+ getNamespaceIndex(),
+ getOutputCoders());
}
- public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+ public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(Coder outputCoder) {
return new SingleOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+ getUrn(),
+ getPayload(),
+ getEndpoint(),
+ getClientFactory(),
+ getNamespaceIndex(),
+ ImmutableMap.of("0", outputCoder));
+ }
+
+ public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(
Review Comment:
Ah, good catch. It doesn't make sense on single output transform. Moved to multi output variant.
##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) Map from a local output tag to a coder id.
+ // If it is set, asks the expansion service to use the given
+ // coders for the output PCollections. Note that the request
+ // may not be fulfilled.
+ map<string, string> output_coder_override = 4;
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850707272
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -431,10 +431,15 @@ def __init__(self, urn, payload, expansion_service=None):
self._payload = (
payload.payload() if isinstance(payload, PayloadBuilder) else payload)
self._expansion_service = expansion_service
+ self._output_coder = None
self._external_namespace = self._fresh_namespace()
self._inputs = {} # type: Dict[str, pvalue.PCollection]
self._outputs = {} # type: Dict[str, pvalue.PCollection]
+ def with_output_coder(self, output_coder):
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852359798
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
}
}
+ ExpansionApi.ExpansionRequest.Builder requestBuilder =
+ ExpansionApi.ExpansionRequest.newBuilder();
+ if (!outputCoders.isEmpty()) {
Review Comment:
Done.
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
}
}
+ ExpansionApi.ExpansionRequest.Builder requestBuilder =
+ ExpansionApi.ExpansionRequest.newBuilder();
+ if (!outputCoders.isEmpty()) {
+ requestBuilder.putAllOutputCoderOverride(
+ outputCoders.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ v -> {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099589108
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji merged pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji merged PR #17280:
URL: https://github.com/apache/beam/pull/17280
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] codecov[bot] commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1088400965
# [Codecov](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#17280](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f9b55b6) into [master](https://codecov.io/gh/apache/beam/commit/ca47cd83670a77507abcf5c54429966743af3ec0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ca47cd8) will **increase** coverage by `0.04%`.
> The diff coverage is `50.00%`.
```diff
@@ Coverage Diff @@
## master #17280 +/- ##
==========================================
+ Coverage 73.98% 74.02% +0.04%
==========================================
Files 674 675 +1
Lines 88558 88878 +320
==========================================
+ Hits 65521 65795 +274
- Misses 21916 21962 +46
Partials 1121 1121
```
| Flag | Coverage Δ | |
|---|---|---|
| python | `83.68% <50.00%> (+<0.01%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...ache\_beam/runners/portability/expansion\_service.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9leHBhbnNpb25fc2VydmljZS5weQ==) | `89.58% <50.00%> (-3.60%)` | :arrow_down: |
| [...ers/portability/fn\_api\_runner/watermark\_manager.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dhdGVybWFya19tYW5hZ2VyLnB5) | `93.33% <0.00%> (-2.67%)` | :arrow_down: |
| [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
| [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.25% <0.00%> (-1.09%)` | :arrow_down: |
| [...eam/runners/portability/fn\_api\_runner/fn\_runner.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2ZuX3J1bm5lci5weQ==) | `90.01% <0.00%> (-0.90%)` | :arrow_down: |
| [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `93.42% <0.00%> (-0.20%)` | :arrow_down: |
| [sdks/python/apache\_beam/coders/row\_coder.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vY29kZXJzL3Jvd19jb2Rlci5weQ==) | `97.00% <0.00%> (-0.15%)` | :arrow_down: |
| [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.51% <0.00%> (-0.13%)` | :arrow_down: |
| [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.34% <0.00%> (-0.11%)` | :arrow_down: |
| [sdks/python/apache\_beam/typehints/typehints.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3R5cGVoaW50cy5weQ==) | `93.37% <0.00%> (-0.07%)` | :arrow_down: |
| ... and [12 more](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [ca47cd8...f9b55b6](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1089256631
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850705733
##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) If this coder is set, overrides the output type of ExternalTransform
+ org.apache.beam.model.pipeline.v1.Coder output_coder_override = 4;
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1101799740
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1089449959
R: @robertwb
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850711768
##########
sdks/python/apache_beam/runners/portability/expansion_service.py:
##########
@@ -63,6 +64,10 @@ def with_pipeline(component, pcoll_id=None):
}
transform = with_pipeline(
ptransform.PTransform.from_runner_api(request.transform, context))
+ if request.output_coder_override.spec.urn:
+ output_coder = Coder.from_runner_api(
+ request.output_coder_override, context)
+ transform = transform.with_output_types(output_coder.to_type_hint())
Review Comment:
looks like we're already doing type compatibility check during transform application to pipeline: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L711
Incompatible output type assignment will throw an error at https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py#L75
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099621047
@robertwb PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1089544386
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852360154
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
+ output_coder = None
+ if self._type_hints.output_types:
+ if self._type_hints.output_types[0]:
+ output_coder = dict((str(k), context.coder_id_from_element_type(v))
+ for k,
Review Comment:
Thanks. Better looking with (k, v) syntax.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
+ output_coder = None
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850707033
##########
sdks/python/apache_beam/runners/portability/expansion_service_test.py:
##########
@@ -270,6 +270,20 @@ def from_runner_api_parameter(unused_ptransform, payload, unused_context):
return PayloadTransform(payload.decode('ascii'))
+@ptransform.PTransform.register_urn('sum_without_type', None)
+class SumWithoutTypeTransform(ptransform.PTransform):
+ def expand(self, pcoll):
+ return pcoll | beam.CombineGlobally(sum)
Review Comment:
Changed the test transform.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850705908
##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) If this coder is set, overrides the output type of ExternalTransform
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852360778
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -433,6 +434,9 @@ def __init__(self, urn, payload, expansion_service=None):
self._inputs = {} # type: Dict[str, pvalue.PCollection]
self._outputs = {} # type: Dict[str, pvalue.PCollection]
+ def with_output_types(self, *args, **kwargs):
Review Comment:
Without it, the default implementation only allows a single argument.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850712625
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -113,12 +115,12 @@ private static int getFreshNamespaceIndex() {
public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
return new MultiOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getNamespaceIndex());
+ getUrn(), getPayload(), getEndpoint(), getNamespaceIndex(), getOutputCoder());
Review Comment:
Changed to a map instead of a singleton coder value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] robertwb commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852260611
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>, PCollection> output) {
public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
return new MultiOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+ getUrn(),
+ getPayload(),
+ getEndpoint(),
+ getClientFactory(),
+ getNamespaceIndex(),
+ getOutputCoders());
}
- public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+ public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(Coder outputCoder) {
return new SingleOutputExpandableTransform<>(
- getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+ getUrn(),
+ getPayload(),
+ getEndpoint(),
+ getClientFactory(),
+ getNamespaceIndex(),
+ ImmutableMap.of("0", outputCoder));
+ }
+
+ public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(
Review Comment:
Does this make sense on SingleOutputExpandableTransform? Likely it should only be on the Multi variant.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -433,6 +434,9 @@ def __init__(self, urn, payload, expansion_service=None):
self._inputs = {} # type: Dict[str, pvalue.PCollection]
self._outputs = {} # type: Dict[str, pvalue.PCollection]
+ def with_output_types(self, *args, **kwargs):
Review Comment:
Is this needed? (Shouldn't it be inherited?)
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
}
}
+ ExpansionApi.ExpansionRequest.Builder requestBuilder =
+ ExpansionApi.ExpansionRequest.newBuilder();
+ if (!outputCoders.isEmpty()) {
Review Comment:
No need to gate on this--if it's empty it'll still do the right thing.
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
+ output_coder = None
+ if self._type_hints.output_types:
+ if self._type_hints.output_types[0]:
+ output_coder = dict((str(k), context.coder_id_from_element_type(v))
+ for k,
Review Comment:
You can do `for (k, v) in ...` to make yapf happy. (Same below.)
##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
}
}
+ ExpansionApi.ExpansionRequest.Builder requestBuilder =
+ ExpansionApi.ExpansionRequest.newBuilder();
+ if (!outputCoders.isEmpty()) {
+ requestBuilder.putAllOutputCoderOverride(
+ outputCoders.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ v -> {
Review Comment:
The variable `v` makes it sound like it's just the value. Maybe `kv` or `e` for entry?
##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
// A namespace (prefix) to use for the id of any newly created
// components.
string namespace = 3;
+
+ // (Optional) Map from a local output tag to a coder id.
+ // If it is set, asks the expansion service to use the given
+ // coders for the output PCollections. Note that the request
+ // may not be fulfilled.
+ map<string, string> output_coder_override = 4;
Review Comment:
Maybe call this output_coder_requests?
##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.primitives.IMPULSE.urn),
outputs={'out': transform_proto.inputs[tag]}))
+ output_coder = None
Review Comment:
Nit: output_coder*s*.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099661085
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099586721
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest
Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1101831266
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org