You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/07 00:19:47 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #17280: [BEAM-14251] add output_coder_override to ExpansionRequest

robertwb commented on code in PR #17280:
URL: https://github.com/apache/beam/pull/17280#discussion_r844532488


##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) If this coder is set, overrides the output type of ExternalTransform
+  org.apache.beam.model.pipeline.v1.Coder output_coder_override = 4;

Review Comment:
   This needs to be a map<tag, Coder> as a transform may have many outputs. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -113,12 +115,12 @@ private static int getFreshNamespaceIndex() {
 
     public MultiOutputExpandableTransform<InputT> withMultiOutputs() {
       return new MultiOutputExpandableTransform<>(
-          getUrn(), getPayload(), getEndpoint(), getNamespaceIndex());
+          getUrn(), getPayload(), getEndpoint(), getNamespaceIndex(), getOutputCoder());

Review Comment:
   Not sure what this means. The coder should be used for all outputs? Likely we should disallow this to be non-null. 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -431,10 +431,15 @@ def __init__(self, urn, payload, expansion_service=None):
     self._payload = (
         payload.payload() if isinstance(payload, PayloadBuilder) else payload)
     self._expansion_service = expansion_service
+    self._output_coder = None
     self._external_namespace = self._fresh_namespace()
     self._inputs = {}  # type: Dict[str, pvalue.PCollection]
     self._outputs = {}  # type: Dict[str, pvalue.PCollection]
 
+  def with_output_coder(self, output_coder):

Review Comment:
   PTransforms already have a with_output_type. Let's grab that and (if present) infer the coder rather than adding a new method. 



##########
sdks/python/apache_beam/runners/portability/expansion_service_test.py:
##########
@@ -270,6 +270,20 @@ def from_runner_api_parameter(unused_ptransform, payload, unused_context):
     return PayloadTransform(payload.decode('ascii'))
 
 
+@ptransform.PTransform.register_urn('sum_without_type', None)
+class SumWithoutTypeTransform(ptransform.PTransform):
+  def expand(self, pcoll):
+    return pcoll | beam.CombineGlobally(sum)

Review Comment:
   Can we never infer this? Better to use some type that we know we'll never be able to infer (e.g. due to a data dependency like "Map(lambda x: str_literal if x else int_literal)" that would technically be a union type but we could ensure is not).



##########
sdks/python/apache_beam/runners/portability/expansion_service.py:
##########
@@ -63,6 +64,10 @@ def with_pipeline(component, pcoll_id=None):
       }
       transform = with_pipeline(
           ptransform.PTransform.from_runner_api(request.transform, context))
+      if request.output_coder_override.spec.urn:
+        output_coder = Coder.from_runner_api(
+            request.output_coder_override, context)
+        transform = transform.with_output_types(output_coder.to_type_hint())

Review Comment:
   This could be unsafe. Should we at least make sure it's compatible? (This would have to happen after application...)



##########
model/job-management/src/main/proto/beam_expansion_api.proto:
##########
@@ -46,6 +46,9 @@ message ExpansionRequest {
   // A namespace (prefix) to use for the id of any newly created
   // components.
   string namespace = 3;
+
+  // (Optional) If this coder is set, overrides the output type of ExternalTransform

Review Comment:
   Let's let this be a request that the outputs have the given coders. This way the expansion service can do its best to satisfy the request, and if it can't the caller has one more chance to do its best to interpret the results. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org