You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/05 07:43:35 UTC

[GitHub] [beam] ihji opened a new pull request, #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

ihji opened a new pull request, #17280:
URL: https://github.com/apache/beam/pull/17280

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r844532488


##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) If this coder is set, overrides the output type of ExternalTransform
+  org.apache.beam.model.pipeline.v1.Coder output_coder_override = 4;

Review Comment:
   This needs to be a map<tag, Coder> as a transform may have many outputs. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -113,12 +115,12 @@ private static int getFreshNamespaceIndex() {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getNamespaceIndex());
+          getUrn(), getPayload(), getEndpoint(), getNamespaceIndex(), getOutputCoder());

Review Comment:
   Not sure what this means. The coder should be used for all outputs? Likely we should disallow this to be non-null. 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -431,10 +431,15 @@ def __init__(self, urn, payload, expansion_service=None):
     self._payload = (
         payload.payload() if isinstance(payload, PayloadBuilder) else payload)
     self._expansion_service = expansion_service
+    self._output_coder = None
     self._external_namespace = self._fresh_namespace()
     self._inputs = {}  # type: Dict[str, pvalue.PCollection]
     self._outputs = {}  # type: Dict[str, pvalue.PCollection]
 
+  def with_output_coder(self, output_coder):

Review Comment:
   PTransforms already have a with_output_type. Let's grab that and (if present) infer the coder rather than adding a new method. 



##########
sdks/python/apache_beam/runners/portability/expansion_service_test.py:
##########
@@ -270,6 +270,20 @@ def from_runner_api_parameter(unused_ptransform, payload, unused_context):
     return PayloadTransform(payload.decode('ascii'))
 
 
+@ptransform.PTransform.register_urn('sum_without_type', None)
+class SumWithoutTypeTransform(ptransform.PTransform):
+  def expand(self, pcoll):
+    return pcoll | beam.CombineGlobally(sum)

Review Comment:
   Can we never infer this? Better to use some type that we know we'll never be able to infer (e.g. due to a data dependency like "Map(lambda x: str_literal if x else int_literal)" that would technically be a union type but we could ensure is not).



##########
sdks/python/apache_beam/runners/portability/expansion_service.py:
##########
@@ -63,6 +64,10 @@ def with_pipeline(component, pcoll_id=None):
       }
       transform = with_pipeline(
           ptransform.PTransform.from_runner_api(request.transform, context))
+      if request.output_coder_override.spec.urn:
+        output_coder = Coder.from_runner_api(
+            request.output_coder_override, context)
+        transform = transform.with_output_types(output_coder.to_type_hint())

Review Comment:
   This could be unsafe. Should we at least make sure it's compatible? (This would have to happen after application...)



##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) If this coder is set, overrides the output type of ExternalTransform

Review Comment:
   Let's let this be a request that the outputs have the given coders. This way the expansion service can do its best to satisfy the request, and if it can't the caller has one more chance to do its best to interpret the results. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852359639


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>, PCollection> output) {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          getOutputCoders());
     }
 
-    public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+    public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(Coder outputCoder) {
       return new SingleOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          ImmutableMap.of("0", outputCoder));
+    }
+
+    public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(

Review Comment:
   Ah, good catch. It doesn't make sense on single output transform. Moved to multi output variant.



##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) Map from a local output tag to a coder id.
+  // If it is set, asks the expansion service to use the given
+  // coders for the output PCollections. Note that the request
+  // may not be fulfilled.
+  map<string, string> output_coder_override = 4;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850707272


##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -431,10 +431,15 @@ def __init__(self, urn, payload, expansion_service=None):
     self._payload = (
         payload.payload() if isinstance(payload, PayloadBuilder) else payload)
     self._expansion_service = expansion_service
+    self._output_coder = None
     self._external_namespace = self._fresh_namespace()
     self._inputs = {}  # type: Dict[str, pvalue.PCollection]
     self._outputs = {}  # type: Dict[str, pvalue.PCollection]
 
+  def with_output_coder(self, output_coder):

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852359798


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {

Review Comment:
   Done.



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {
+        requestBuilder.putAllOutputCoderOverride(
+            outputCoders.entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        v -> {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099589108

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji merged pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji merged PR #17280:
URL: https://github.com/apache/beam/pull/17280


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1088400965

   # [Codecov](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17280](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f9b55b6) into [master](https://codecov.io/gh/apache/beam/commit/ca47cd83670a77507abcf5c54429966743af3ec0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ca47cd8) will **increase** coverage by `0.04%`.
   > The diff coverage is `50.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #17280      +/-   ##
   ==========================================
   + Coverage   73.98%   74.02%   +0.04%     
   ==========================================
     Files         674      675       +1     
     Lines       88558    88878     +320     
   ==========================================
   + Hits        65521    65795     +274     
   - Misses      21916    21962      +46     
     Partials     1121     1121              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.68% <50.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache\_beam/runners/portability/expansion\_service.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9leHBhbnNpb25fc2VydmljZS5weQ==) | `89.58% <50.00%> (-3.60%)` | :arrow_down: |
   | [...ers/portability/fn\_api\_runner/watermark\_manager.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dhdGVybWFya19tYW5hZ2VyLnB5) | `93.33% <0.00%> (-2.67%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.72% <0.00%> (-1.64%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.25% <0.00%> (-1.09%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/fn\_runner.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2ZuX3J1bm5lci5weQ==) | `90.01% <0.00%> (-0.90%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `93.42% <0.00%> (-0.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/coders/row\_coder.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vY29kZXJzL3Jvd19jb2Rlci5weQ==) | `97.00% <0.00%> (-0.15%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.51% <0.00%> (-0.13%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.34% <0.00%> (-0.11%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/typehints.py](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3R5cGVoaW50cy5weQ==) | `93.37% <0.00%> (-0.07%)` | :arrow_down: |
   | ... and [12 more](https://codecov.io/gh/apache/beam/pull/17280/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [ca47cd8...f9b55b6](https://codecov.io/gh/apache/beam/pull/17280?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1089256631

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850705733


##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) If this coder is set, overrides the output type of ExternalTransform
+  org.apache.beam.model.pipeline.v1.Coder output_coder_override = 4;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1101799740

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1089449959

   R: @robertwb 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850711768


##########
sdks/python/apache_beam/runners/portability/expansion_service.py:
##########
@@ -63,6 +64,10 @@ def with_pipeline(component, pcoll_id=None):
       }
       transform = with_pipeline(
           ptransform.PTransform.from_runner_api(request.transform, context))
+      if request.output_coder_override.spec.urn:
+        output_coder = Coder.from_runner_api(
+            request.output_coder_override, context)
+        transform = transform.with_output_types(output_coder.to_type_hint())

Review Comment:
   looks like we're already doing type compatibility check during transform application to pipeline: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py#L711
   
   Incompatible output type assignment will throw an error at https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py#L75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099621047

   @robertwb PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1089544386

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852360154


##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None
+    if self._type_hints.output_types:
+      if self._type_hints.output_types[0]:
+        output_coder = dict((str(k), context.coder_id_from_element_type(v))
+                            for k,

Review Comment:
   Thanks. Better looking with (k, v) syntax.



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850707033


##########
sdks/python/apache_beam/runners/portability/expansion_service_test.py:
##########
@@ -270,6 +270,20 @@ def from_runner_api_parameter(unused_ptransform, payload, unused_context):
     return PayloadTransform(payload.decode('ascii'))
 
 
+@ptransform.PTransform.register_urn('sum_without_type', None)
+class SumWithoutTypeTransform(ptransform.PTransform):
+  def expand(self, pcoll):
+    return pcoll | beam.CombineGlobally(sum)

Review Comment:
   Changed the test transform.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850705908


##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) If this coder is set, overrides the output type of ExternalTransform

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852360778


##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -433,6 +434,9 @@ def __init__(self, urn, payload, expansion_service=None):
     self._inputs = {}  # type: Dict[str, pvalue.PCollection]
     self._outputs = {}  # type: Dict[str, pvalue.PCollection]
 
+  def with_output_types(self, *args, **kwargs):

Review Comment:
   Without it, the default implementation only allows a single argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r850712625


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -113,12 +115,12 @@ private static int getFreshNamespaceIndex() {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getNamespaceIndex());
+          getUrn(), getPayload(), getEndpoint(), getNamespaceIndex(), getOutputCoder());

Review Comment:
   Changed to a map instead of a singleton coder value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r852260611


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -126,12 +127,33 @@ PCollection<OutputT> toOutputCollection(Map<TupleTag<?>, PCollection> output) {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          getOutputCoders());
     }
 
-    public <T> SingleOutputExpandableTransform<InputT, T> withOutputType() {
+    public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(Coder outputCoder) {
       return new SingleOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getClientFactory(), getNamespaceIndex());
+          getUrn(),
+          getPayload(),
+          getEndpoint(),
+          getClientFactory(),
+          getNamespaceIndex(),
+          ImmutableMap.of("0", outputCoder));
+    }
+
+    public SingleOutputExpandableTransform<InputT, OutputT> withOutputCoder(

Review Comment:
   Does this make sense on SingleOutputExpandableTransform? Likely it should only be on the Multi variant. 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -433,6 +434,9 @@ def __init__(self, urn, payload, expansion_service=None):
     self._inputs = {}  # type: Dict[str, pvalue.PCollection]
     self._outputs = {}  # type: Dict[str, pvalue.PCollection]
 
+  def with_output_types(self, *args, **kwargs):

Review Comment:
   Is this needed? (Shouldn't it be inherited?)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {

Review Comment:
   No need to gate on this--if it's empty it'll still do the right thing.



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None
+    if self._type_hints.output_types:
+      if self._type_hints.output_types[0]:
+        output_coder = dict((str(k), context.coder_id_from_element_type(v))
+                            for k,

Review Comment:
   You can do `for (k, v) in ...` to make yapf happy. (Same below.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -225,9 +251,25 @@ public OutputT expand(InputT input) {
         }
       }
 
+      ExpansionApi.ExpansionRequest.Builder requestBuilder =
+          ExpansionApi.ExpansionRequest.newBuilder();
+      if (!outputCoders.isEmpty()) {
+        requestBuilder.putAllOutputCoderOverride(
+            outputCoders.entrySet().stream()
+                .collect(
+                    Collectors.toMap(
+                        Map.Entry::getKey,
+                        v -> {

Review Comment:
   The variable `v` makes it sound like it's just the value. Maybe `kv` or `e` for entry?



##########
model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_expansion_api.proto:
##########
@@ -46,6 +46,12 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) Map from a local output tag to a coder id.
+  // If it is set, asks the expansion service to use the given
+  // coders for the output PCollections. Note that the request
+  // may not be fulfilled.
+  map<string, string> output_coder_override = 4;

Review Comment:
   Maybe call this output_coder_requests? 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -498,11 +502,24 @@ def expand(self, pvalueish):
               spec=beam_runner_api_pb2.FunctionSpec(
                   urn=common_urns.primitives.IMPULSE.urn),
               outputs={'out': transform_proto.inputs[tag]}))
+    output_coder = None

Review Comment:
   Nit: output_coder*s*. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099661085

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1099586721

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17280:
URL: https://github.com/apache/beam/pull/17280#issuecomment-1101831266

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org