You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ke Wu <ke...@gmail.com> on 2021/07/28 03:30:32 UTC

Identify primitive/native transforms in portable pipeline

Hello All,

When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.

An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5], 

// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;

therefore, I updated the logic:

   private static boolean isPrimitiveTransform(PTransform transform) {
     String urn = PTransformTranslation.urnForTransformOrNull(transform);
-    return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform);
+   return transform.getEnvironmentId().isEmpty()
   }

However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model.

My questions here are:

1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators?
2. Is empty environment_id a good enough indicator to identify a native/primitive transform?
3. Is external transform suppose to have empty or non-empty environment_id?

Best,
Ke


[1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44> 
[2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186> 
[3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254> 
[4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412 <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412> 
[5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194 <https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194> 
[6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392> 


Re: Identify primitive/native transforms in portable pipeline

Posted by Jan Lukavský <je...@seznam.cz>.
I'll add, that there are some issues with the 
TrivialNativeTransformExpander related to SDF/primitive Read expansion. 
The call to 
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() 
somehow causes the TrivialNativeTransformExpander to return a pipeline 
that then throws exception when being fed to 
GreedyPipelineFuser.fuse(...).toPipeline(). The exception is (for example):

java.lang.IllegalArgumentException: PCollectionNodes [PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output, PCollection=unique_name: "PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: UNBOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced

So far, I was not able to figure out if this problem is in the 
TrivialNativeTransformExpander or the GreedyPipelineFuser, or somewhere 
in between, but this whole stuff looks somewhat fragile. If we could 
replace it with something more robust, it would be great.

  Jan

On 8/2/21 8:53 PM, Robert Bradshaw wrote:
> .  iOn Mon, Aug 2, 2021 at 10:55 AM Ke Wu <ke...@gmail.com> wrote:
>> IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes with known urns such that its sub transforms and environment id are eliminated, essentially making it a leaf node. I believe it is needed because when traversing a QueryablePipeline, only leaf nodes are being returned as the expectation is that only leaf nodes needs to be translated directly. So essentially, TrivialNativeTransformExpander  is used to make a composite transform a leaf node in order to give runners capability to translate it at runtime.
> So, in short, TrivialNativeTransformExpander is needed to handle the
> implicit expectations of QueryablePipeline. (I will say that sometimes
> it is easier to reason about things like "what is /the/ producer of
> PCollection X rather than think about the fact that both a "leaf" and
> any number of composites could be considered its producer.)
>
>> I love the idea of relying on transform urns alone to determine primitive transforms so make things clearer. In order for us to do that, I suppose it is better for us to provide a unified approach to register translators instead of each runner has its own way of doing such. [1][2][3]
>>
>> What are your thoughts?
> Yes, each runner has a set of URNs that it can handle directly. We
> could let this set be the "primitives" or "things to be translated."
> +1 to making this more uniform.
>
> It's possible that a runner should be able to say "sometimes I can
> handle this URN, sometimes I can't and want to fall back to the
> composite implementation" which could complicate this.
>
>> Best,
>> Ke
>>
>>
>> [1] Flink:
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145
>>
>> [2] Spark:
>> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99
>> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100
>>
>> [3] Samza:
>> https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173
>>
>> On Jul 29, 2021, at 1:58 PM, Robert Bradshaw <ro...@google.com> wrote:
>>
>> OK, I have to admit that I have no idea what the purpose of
>> TrivialNativeTransformExpander is--maybe something else in the
>> infrastructure can't handle translating pipelines that have
>> subtransforms or something like that? It seems to me the map of urn ->
>> PTransformTranslator should be sufficient to do the translation
>> (simply ignoring subtransforms for any known URN when walking the
>> tree). In this case the whole question and infrastructure around "is
>> this primitive" could simply go away.
>>
>> On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke...@gmail.com> wrote:
>>
>>
>> Hi Robert,
>>
>> Thanks for the reply, the motivation for this is, I noticed when we need to translate a composite transform, there are two steps involved:
>>
>> 1. Register the composite transform urn with a delicate translator. [1]
>> 2. Register the composite transform urn with @AutoServer of NativeTranforms [2]
>>
>> I was wondering whether step 2 could be eliminated since after a composite transform urn is registered with translator, its environment id and subtrasnforms will be removed from pipeline components by TrivialNativeTransformExpander [3]
>>
>> If environment id is not sufficient to distinguish a primitive transform, I suppose we need to keep step 2, unless we update QueryablePipeline to take known urns as an input.
>>
>> Does this make sense to you?
>>
>> Best,
>> Ke
>>
>>
>> [1] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
>> [2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [3] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
>>
>>
>> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <ro...@google.com> wrote:
>>
>> A composite transform is a transform that either only returns its
>> inputs or has subtransforms. A primitive, or leaf, transform would be
>> the complement of that. Checking environment ids is not sufficient
>> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
>> does).
>>
>> Perhaps it's worth taking a step back and trying to understand why you
>> need to distinguish between primitive and nonprimitive transforms at
>> all. When translating a pipeline,
>>
>> (1) If you know the URN, you can implement the transform directly,
>> (2) If you don't know the URN, look to see if its outputs are a subset
>> of the inputs (in which case there's nothing to do) or it has
>> subtransforms (in which case you can just recurse to compute the
>> outputs).
>> (3) If neither (1) nor (2) hold, then you throw an error. This is an
>> unknown primitive.
>>
>>
>> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke...@gmail.com> wrote:
>>
>>
>> Hello All,
>>
>> When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.
>>
>> An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5],
>>
>> // Environment where the current PTransform should be executed in.
>> //
>> // Transforms that are required to be implemented by a runner must omit this.
>> // All other transforms are required to specify this.
>> string environment_id = 7;
>>
>>
>> therefore, I updated the logic:
>>
>>   private static boolean isPrimitiveTransform(PTransform transform) {
>>     String urn = PTransformTranslation.urnForTransformOrNull(transform);
>> -    return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform);
>> +   return transform.getEnvironmentId().isEmpty()
>>   }
>>
>>
>> However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model.
>>
>> My questions here are:
>>
>> 1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators?
>> 2. Is empty environment_id a good enough indicator to identify a native/primitive transform?
>> 3. Is external transform suppose to have empty or non-empty environment_id?
>>
>> Best,
>> Ke
>>
>>
>> [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
>> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
>> [3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
>> [5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>> [6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>>
>>
>>

Re: Identify primitive/native transforms in portable pipeline

Posted by Robert Bradshaw <ro...@google.com>.
.  iOn Mon, Aug 2, 2021 at 10:55 AM Ke Wu <ke...@gmail.com> wrote:
>
> IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes with known urns such that its sub transforms and environment id are eliminated, essentially making it a leaf node. I believe it is needed because when traversing a QueryablePipeline, only leaf nodes are being returned as the expectation is that only leaf nodes needs to be translated directly. So essentially, TrivialNativeTransformExpander  is used to make a composite transform a leaf node in order to give runners capability to translate it at runtime.

So, in short, TrivialNativeTransformExpander is needed to handle the
implicit expectations of QueryablePipeline. (I will say that sometimes
it is easier to reason about things like "what is /the/ producer of
PCollection X rather than think about the fact that both a "leaf" and
any number of composites could be considered its producer.)

> I love the idea of relying on transform urns alone to determine primitive transforms so make things clearer. In order for us to do that, I suppose it is better for us to provide a unified approach to register translators instead of each runner has its own way of doing such. [1][2][3]
>
> What are your thoughts?

Yes, each runner has a set of URNs that it can handle directly. We
could let this set be the "primitives" or "things to be translated."
+1 to making this more uniform.

It's possible that a runner should be able to say "sometimes I can
handle this URN, sometimes I can't and want to fall back to the
composite implementation" which could complicate this.

> Best,
> Ke
>
>
> [1] Flink:
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145
>
> [2] Spark:
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100
>
> [3] Samza:
> https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173
>
> On Jul 29, 2021, at 1:58 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> OK, I have to admit that I have no idea what the purpose of
> TrivialNativeTransformExpander is--maybe something else in the
> infrastructure can't handle translating pipelines that have
> subtransforms or something like that? It seems to me the map of urn ->
> PTransformTranslator should be sufficient to do the translation
> (simply ignoring subtransforms for any known URN when walking the
> tree). In this case the whole question and infrastructure around "is
> this primitive" could simply go away.
>
> On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke...@gmail.com> wrote:
>
>
> Hi Robert,
>
> Thanks for the reply, the motivation for this is, I noticed when we need to translate a composite transform, there are two steps involved:
>
> 1. Register the composite transform urn with a delicate translator. [1]
> 2. Register the composite transform urn with @AutoServer of NativeTranforms [2]
>
> I was wondering whether step 2 could be eliminated since after a composite transform urn is registered with translator, its environment id and subtrasnforms will be removed from pipeline components by TrivialNativeTransformExpander [3]
>
> If environment id is not sufficient to distinguish a primitive transform, I suppose we need to keep step 2, unless we update QueryablePipeline to take known urns as an input.
>
> Does this make sense to you?
>
> Best,
> Ke
>
>
> [1] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
> [2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
> [3] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
>
>
> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> A composite transform is a transform that either only returns its
> inputs or has subtransforms. A primitive, or leaf, transform would be
> the complement of that. Checking environment ids is not sufficient
> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
> does).
>
> Perhaps it's worth taking a step back and trying to understand why you
> need to distinguish between primitive and nonprimitive transforms at
> all. When translating a pipeline,
>
> (1) If you know the URN, you can implement the transform directly,
> (2) If you don't know the URN, look to see if its outputs are a subset
> of the inputs (in which case there's nothing to do) or it has
> subtransforms (in which case you can just recurse to compute the
> outputs).
> (3) If neither (1) nor (2) hold, then you throw an error. This is an
> unknown primitive.
>
>
> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke...@gmail.com> wrote:
>
>
> Hello All,
>
> When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.
>
> An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5],
>
> // Environment where the current PTransform should be executed in.
> //
> // Transforms that are required to be implemented by a runner must omit this.
> // All other transforms are required to specify this.
> string environment_id = 7;
>
>
> therefore, I updated the logic:
>
>  private static boolean isPrimitiveTransform(PTransform transform) {
>    String urn = PTransformTranslation.urnForTransformOrNull(transform);
> -    return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform);
> +   return transform.getEnvironmentId().isEmpty()
>  }
>
>
> However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model.
>
> My questions here are:
>
> 1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators?
> 2. Is empty environment_id a good enough indicator to identify a native/primitive transform?
> 3. Is external transform suppose to have empty or non-empty environment_id?
>
> Best,
> Ke
>
>
> [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
> [3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
> [4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
> [5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
> [6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>
>
>

Re: Identify primitive/native transforms in portable pipeline

Posted by Ke Wu <ke...@gmail.com>.
IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes with known urns such that its sub transforms and environment id are eliminated, essentially making it a leaf node. I believe it is needed because when traversing a QueryablePipeline, only leaf nodes are being returned as the expectation is that only leaf nodes needs to be translated directly. So essentially, TrivialNativeTransformExpander  is used to make a composite transform a leaf node in order to give runners capability to translate it at runtime.

I love the idea of relying on transform urns alone to determine primitive transforms so make things clearer. In order for us to do that, I suppose it is better for us to provide a unified approach to register translators instead of each runner has its own way of doing such. [1][2][3]

What are your thoughts?

Best,
Ke


[1] Flink:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144> 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122>
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145>

[2] Spark:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99 <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99>
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100 <https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100>

[3] Samza:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173 <https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173> 

> On Jul 29, 2021, at 1:58 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> OK, I have to admit that I have no idea what the purpose of
> TrivialNativeTransformExpander is--maybe something else in the
> infrastructure can't handle translating pipelines that have
> subtransforms or something like that? It seems to me the map of urn ->
> PTransformTranslator should be sufficient to do the translation
> (simply ignoring subtransforms for any known URN when walking the
> tree). In this case the whole question and infrastructure around "is
> this primitive" could simply go away.
> 
> On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke...@gmail.com> wrote:
>> 
>> Hi Robert,
>> 
>> Thanks for the reply, the motivation for this is, I noticed when we need to translate a composite transform, there are two steps involved:
>> 
>> 1. Register the composite transform urn with a delicate translator. [1]
>> 2. Register the composite transform urn with @AutoServer of NativeTranforms [2]
>> 
>> I was wondering whether step 2 could be eliminated since after a composite transform urn is registered with translator, its environment id and subtrasnforms will be removed from pipeline components by TrivialNativeTransformExpander [3]
>> 
>> If environment id is not sufficient to distinguish a primitive transform, I suppose we need to keep step 2, unless we update QueryablePipeline to take known urns as an input.
>> 
>> Does this make sense to you?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
>> [2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [3] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
>> 
>> 
>> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <ro...@google.com> wrote:
>> 
>> A composite transform is a transform that either only returns its
>> inputs or has subtransforms. A primitive, or leaf, transform would be
>> the complement of that. Checking environment ids is not sufficient
>> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
>> does).
>> 
>> Perhaps it's worth taking a step back and trying to understand why you
>> need to distinguish between primitive and nonprimitive transforms at
>> all. When translating a pipeline,
>> 
>> (1) If you know the URN, you can implement the transform directly,
>> (2) If you don't know the URN, look to see if its outputs are a subset
>> of the inputs (in which case there's nothing to do) or it has
>> subtransforms (in which case you can just recurse to compute the
>> outputs).
>> (3) If neither (1) nor (2) hold, then you throw an error. This is an
>> unknown primitive.
>> 
>> 
>> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke...@gmail.com> wrote:
>> 
>> 
>> Hello All,
>> 
>> When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.
>> 
>> An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5],
>> 
>> // Environment where the current PTransform should be executed in.
>> //
>> // Transforms that are required to be implemented by a runner must omit this.
>> // All other transforms are required to specify this.
>> string environment_id = 7;
>> 
>> 
>> therefore, I updated the logic:
>> 
>>  private static boolean isPrimitiveTransform(PTransform transform) {
>>    String urn = PTransformTranslation.urnForTransformOrNull(transform);
>> -    return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform);
>> +   return transform.getEnvironmentId().isEmpty()
>>  }
>> 
>> 
>> However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model.
>> 
>> My questions here are:
>> 
>> 1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators?
>> 2. Is empty environment_id a good enough indicator to identify a native/primitive transform?
>> 3. Is external transform suppose to have empty or non-empty environment_id?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
>> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
>> [3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
>> [5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>> [6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>> 
>> 


Re: Identify primitive/native transforms in portable pipeline

Posted by Robert Bradshaw <ro...@google.com>.
OK, I have to admit that I have no idea what the purpose of
TrivialNativeTransformExpander is--maybe something else in the
infrastructure can't handle translating pipelines that have
subtransforms or something like that? It seems to me the map of urn ->
PTransformTranslator should be sufficient to do the translation
(simply ignoring subtransforms for any known URN when walking the
tree). In this case the whole question and infrastructure around "is
this primitive" could simply go away.

On Wed, Jul 28, 2021 at 8:47 PM Ke Wu <ke...@gmail.com> wrote:
>
> Hi Robert,
>
> Thanks for the reply, the motivation for this is, I noticed when we need to translate a composite transform, there are two steps involved:
>
> 1. Register the composite transform urn with a delicate translator. [1]
> 2. Register the composite transform urn with @AutoServer of NativeTranforms [2]
>
> I was wondering whether step 2 could be eliminated since after a composite transform urn is registered with translator, its environment id and subtrasnforms will be removed from pipeline components by TrivialNativeTransformExpander [3]
>
> If environment id is not sufficient to distinguish a primitive transform, I suppose we need to keep step 2, unless we update QueryablePipeline to take known urns as an input.
>
> Does this make sense to you?
>
> Best,
> Ke
>
>
> [1] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
> [2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
> [3] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
>
>
> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> A composite transform is a transform that either only returns its
> inputs or has subtransforms. A primitive, or leaf, transform would be
> the complement of that. Checking environment ids is not sufficient
> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
> does).
>
> Perhaps it's worth taking a step back and trying to understand why you
> need to distinguish between primitive and nonprimitive transforms at
> all. When translating a pipeline,
>
> (1) If you know the URN, you can implement the transform directly,
> (2) If you don't know the URN, look to see if its outputs are a subset
> of the inputs (in which case there's nothing to do) or it has
> subtransforms (in which case you can just recurse to compute the
> outputs).
> (3) If neither (1) nor (2) hold, then you throw an error. This is an
> unknown primitive.
>
>
> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke...@gmail.com> wrote:
>
>
> Hello All,
>
> When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.
>
> An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5],
>
> // Environment where the current PTransform should be executed in.
> //
> // Transforms that are required to be implemented by a runner must omit this.
> // All other transforms are required to specify this.
> string environment_id = 7;
>
>
> therefore, I updated the logic:
>
>   private static boolean isPrimitiveTransform(PTransform transform) {
>     String urn = PTransformTranslation.urnForTransformOrNull(transform);
> -    return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform);
> +   return transform.getEnvironmentId().isEmpty()
>   }
>
>
> However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model.
>
> My questions here are:
>
> 1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators?
> 2. Is empty environment_id a good enough indicator to identify a native/primitive transform?
> 3. Is external transform suppose to have empty or non-empty environment_id?
>
> Best,
> Ke
>
>
> [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
> [3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
> [4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
> [5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
> [6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>
>

Re: Identify primitive/native transforms in portable pipeline

Posted by Ke Wu <ke...@gmail.com>.
Hi Robert,

Thanks for the reply, the motivation for this is, I noticed when we need to translate a composite transform, there are two steps involved:

1. Register the composite transform urn with a delicate translator. [1]
2. Register the composite transform urn with @AutoServer of NativeTranforms [2]

I was wondering whether step 2 could be eliminated since after a composite transform urn is registered with translator, its environment id and subtrasnforms will be removed from pipeline components by TrivialNativeTransformExpander [3]

If environment id is not sufficient to distinguish a primitive transform, I suppose we need to keep step 2, unless we update QueryablePipeline to take known urns as an input.

Does this make sense to you?

Best,
Ke


[1] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158> 
[2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254 <https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254> 
[3] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59> 


> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> A composite transform is a transform that either only returns its
> inputs or has subtransforms. A primitive, or leaf, transform would be
> the complement of that. Checking environment ids is not sufficient
> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
> does).
> 
> Perhaps it's worth taking a step back and trying to understand why you
> need to distinguish between primitive and nonprimitive transforms at
> all. When translating a pipeline,
> 
> (1) If you know the URN, you can implement the transform directly,
> (2) If you don't know the URN, look to see if its outputs are a subset
> of the inputs (in which case there's nothing to do) or it has
> subtransforms (in which case you can just recurse to compute the
> outputs).
> (3) If neither (1) nor (2) hold, then you throw an error. This is an
> unknown primitive.
> 
> 
> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke...@gmail.com> wrote:
>> 
>> Hello All,
>> 
>> When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.
>> 
>> An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5],
>> 
>> // Environment where the current PTransform should be executed in.
>> //
>> // Transforms that are required to be implemented by a runner must omit this.
>> // All other transforms are required to specify this.
>> string environment_id = 7;
>> 
>> 
>> therefore, I updated the logic:
>> 
>>   private static boolean isPrimitiveTransform(PTransform transform) {
>>     String urn = PTransformTranslation.urnForTransformOrNull(transform);
>> -    return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform);
>> +   return transform.getEnvironmentId().isEmpty()
>>   }
>> 
>> 
>> However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model.
>> 
>> My questions here are:
>> 
>> 1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators?
>> 2. Is empty environment_id a good enough indicator to identify a native/primitive transform?
>> 3. Is external transform suppose to have empty or non-empty environment_id?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
>> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
>> [3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
>> [5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
>> [6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>> 


Re: Identify primitive/native transforms in portable pipeline

Posted by Robert Bradshaw <ro...@google.com>.
A composite transform is a transform that either only returns its
inputs or has subtransforms. A primitive, or leaf, transform would be
the complement of that. Checking environment ids is not sufficient
(e.g. the Flatten primitive doesn't need an environment, but the ParDo
does).

Perhaps it's worth taking a step back and trying to understand why you
need to distinguish between primitive and nonprimitive transforms at
all. When translating a pipeline,

(1) If you know the URN, you can implement the transform directly,
(2) If you don't know the URN, look to see if its outputs are a subset
of the inputs (in which case there's nothing to do) or it has
subtransforms (in which case you can just recurse to compute the
outputs).
(3) If neither (1) nor (2) hold, then you throw an error. This is an
unknown primitive.


On Tue, Jul 27, 2021 at 8:30 PM Ke Wu <ke...@gmail.com> wrote:
>
> Hello All,
>
> When I am looking at translating composite transforms in potable pipeline, I realized that TrivialNativeTransformExpander[1] is being used to identify native transforms by transform urn, and it removes sub-transform and environment id in the corresponding transform node. However, QueryablePipeline seems to identify primitive transforms in a different approach [2], which requires us to register runner native transforms again [3][4] in addition to the transform translators.
>
> An idea came to me that we should be able to identify primitive/native transform by look at its environment according to protobuf model [5],
>
> // Environment where the current PTransform should be executed in.
> //
> // Transforms that are required to be implemented by a runner must omit this.
> // All other transforms are required to specify this.
> string environment_id = 7;
>
>
> therefore, I updated the logic:
>
>    private static boolean isPrimitiveTransform(PTransform transform) {
>      String urn = PTransformTranslation.urnForTransformOrNull(transform);
> -    return PRIMITIVE_URNS.contains(urn) || NativeTransforms.isNative(transform);
> +   return transform.getEnvironmentId().isEmpty()
>    }
>
>
> However, tests started to fail on SQL cases where I found that external transforms seem to have empty environment id as well [6], which does not seem to confront the protobuf model.
>
> My questions here are:
>
> 1. Is NativeTranforms required to register a primitive/native transform in addition to register with translators?
> 2. Is empty environment_id a good enough indicator to identify a native/primitive transform?
> 3. Is external transform suppose to have empty or non-empty environment_id?
>
> Best,
> Ke
>
>
> [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
> [3] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
> [4] https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
> [5] https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
> [6] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
>