You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Sam Rohde <sr...@google.com> on 2020/05/19 22:45:10 UTC

More metadata in Coder Proto

Hi all,

Should there be more metadata in the Coder Proto? For example, adding an
"is_deterministic" boolean field. This will allow for a language-agnostic
way for SDKs to infer properties about a coder received from the expansion
service.

My motivation for this is that I recently ran into a problem in which an
"ExternalCoder" in the Python SDK was erroneously marked as
non-deterministic. The reason being is that the Coder proto doesn't have an
"is_deterministic" and when the coder fails to be recreated in Python, the
ExternalCoder defaults to False.

Regards,
Sam

Re: More metadata in Coder Proto

Posted by Luke Cwik <lc...@google.com>.
On Wed, May 20, 2020 at 11:09 AM Sam Rohde <sr...@google.com> wrote:

> +Robert Bradshaw <ro...@google.com> who is the reviewer on
> https://github.com/apache/beam/pull/11503. How does that sound to you?
> Skip the "is input deterministic" check for GBKs embedded in x-lang
> transforms?
>
> On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sr...@google.com> wrote:
>
>> Thanks for your comments, here's a little more to the problem I'm working
>> on: I have a PR to make GBK a primitive
>> <https://github.com/apache/beam/pull/11503> and the aforementioned
>> test_combine_globally was check failing in the run_pipeline method of the
>> DataflowRunner.
>> Specifically what is failing is when the DataflowRunner visits each
>> transform, it checks if the GBK has a deterministic input coder. This fails
>> when the GBK is expanded from the expansion service because the resulting
>> ExternalCoder doesn't override the is_deterministic method.
>>
>> This wasn't being hit before because this deterministic input check only
>> occurred during the apply_GroupByKey method. However, I moved it to when
>> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
>> stage.
>>
>>
>> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> If the CombineGlobally is being returned by the expansion service, the
>>> expansion service is on the hook for ensuring that intermediate
>>> PCollections/PTransforms/... are constructed correctly.
>>>
>> Okay, this was kind of my hunch. If the DataflowRunner is making sure
>> that the input coder to a GBK is deterministic, then we should skip the
>> check if we receive an x-lang transform (seen in the Python SDK as a
>> RunnerAPITransformHolder).
>>
>>
>>>
>>> I thought this question was about what to do if you want to take the
>>> output of an XLang pipeline and process it through some generic transform
>>> that doesn't care about the types and treats it like an opaque blob (like
>>> the Count transform) and how to make that work when you don't know the
>>> output properties. I don't think anyone has shared a design doc for this
>>> problem that covered the different approaches.
>>>
>> Aside from the DataflowRunner GBK problem, I was also curious if there
>> was any need for metadata around the Coder proto and why there currently is
>> no metadata. If there was more metadata, like an is_deterministic field,
>> then the GBK deterministic input check could also work.
>>
>>
It doesn't exist because there was no reason for those properties to be
exposed since it was all at pipeline construction time and all these
details could be held within the SDK. Once the pipeline is converted to
proto, the contract for using the beam:transform:group_by_key:v1 transform
is that the key encoding is deterministic and it was upto SDKs to perform
this validation. Since pipeline construction has now spilled over to
include transmitting parts of the pipeline in proto form because of how
XLang expansion works, it might be necessary to expose more of these
properties but this is yet to be designed.


>
>>
>>>
>>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>>> CombineGlobally implementation that takes a KV with a Void type (with
>>>> VoidCoder) [2] as input.
>>>>
>>>> ExternalCoder was added to Python SDK to represent coders within
>>>> external transforms that are not standard coders (in this case the
>>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>>> graph -> Dataflow job request" conversion.
>>>>
>>>> Seems like today, a runner is unable to perform this particular
>>>> validation (and maybe others ?) for pipeline segments received through a
>>>> cross-language transform expansion with or without the ExternalCoder. Note
>>>> that a runner is not involved during cross-language transform expansion, so
>>>> pipeline submission is the only location where a runner would get a chance
>>>> to perform this kind of validation for cross-language transforms.
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>>> [2]
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>>
>>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Since combine globally is a case where you don't need to know what the
>>>>> key or value is and could treat them as bytes allowing you to build and
>>>>> execute this pipeline (assuming you ignored properties such as
>>>>> is_deterministic).
>>>>>
>>>>> Regardless, I still think it makes sense to provide criteria on what
>>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>>> be defined to support such a case. Your suggested solution of adding
>>>>> properties to coders is one possible solution but I think we have to take a
>>>>> step back and consider xlang as a whole since there are still several yet
>>>>> to be solved issues within it.
>>>>>
>>>>>
>>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:
>>>>>
>>>>>> I have a PR that makes GBK a primitive in which the
>>>>>> test_combine_globally
>>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>>>> over the transform in the run_pipeline method. I moved a method that
>>>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>>>
>>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into
>>>>>>> a downstream transform that enforces is_deterministic. My understanding of
>>>>>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>>>>>> that it's a shim added so the Python SDK can handle coders that are
>>>>>>> internal to cross-language transforms.
>>>>>>> I think that if the Python SDK is trying to introspect an
>>>>>>> ExternalCoder instance then something is wrong.
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> I see. The problem is that you are trying to know certain
>>>>>>>> properties of the coder to use in a downstream transform which enforces
>>>>>>>> that it is deterministic like GroupByKey.
>>>>>>>>
>>>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>>>> SDKs to understand the coder, how are you having a cross language pipeline
>>>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>>>
>>>>>>>> Also, an alternative strategy would be to tell the expansion
>>>>>>>> service that you need to choose a coder that is deterministic on the
>>>>>>>> output. This would require building the pipeline and before submission to
>>>>>>>> the job server perform the expansion telling it all the limitations that
>>>>>>>> the SDK has imposed on it.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>>>>>> from the expansion service.
>>>>>>>>>
>>>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>>>>>> ExternalCoder defaults to False.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Sam
>>>>>>>>>
>>>>>>>>>

Re: More metadata in Coder Proto

Posted by Kenneth Knowles <ke...@apache.org>.
Thought I'd mention a valuable enhancement that has been proposed a couple
of times: when inferring a coder, choose a deterministic coder when one is
needed. Our current behavior of picking a coder first and then crashing if
we picked the wrong one is suboptimal, for no real reason.

xlang does throw this feature into doubt. I don't see an obvious solution.

Kenn

On Wed, May 20, 2020 at 12:10 PM Robert Bradshaw <ro...@google.com>
wrote:

> On Wed, May 20, 2020 at 11:09 AM Sam Rohde <sr...@google.com> wrote:
>
>> +Robert Bradshaw <ro...@google.com> who is the reviewer on
>> https://github.com/apache/beam/pull/11503. How does that sound to you?
>> Skip the "is input deterministic" check for GBKs embedded in x-lang
>> transforms?
>>
>
> Yes, I think this is the right situation in this case. Longer-term, we may
> want to handle cases like
>
> [java produces KVs]
> [python performs GBK]
> [java consumes GBK results]
>
> where properties like this may need to be exposed, but this may also be
> ruled out by rejecting "unknown" coders at the boundaries (rather than ones
> that are entirely internal).
>
>
>> On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sr...@google.com> wrote:
>>
>>> Thanks for your comments, here's a little more to the problem I'm
>>> working on: I have a PR to make GBK a primitive
>>> <https://github.com/apache/beam/pull/11503> and the aforementioned
>>> test_combine_globally was check failing in the run_pipeline method of the
>>> DataflowRunner.
>>> Specifically what is failing is when the DataflowRunner visits each
>>> transform, it checks if the GBK has a deterministic input coder. This fails
>>> when the GBK is expanded from the expansion service because the resulting
>>> ExternalCoder doesn't override the is_deterministic method.
>>>
>>> This wasn't being hit before because this deterministic input check only
>>> occurred during the apply_GroupByKey method. However, I moved it to when
>>> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
>>> stage.
>>>
>>>
>>> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> If the CombineGlobally is being returned by the expansion service, the
>>>> expansion service is on the hook for ensuring that intermediate
>>>> PCollections/PTransforms/... are constructed correctly.
>>>>
>>> Okay, this was kind of my hunch. If the DataflowRunner is making sure
>>> that the input coder to a GBK is deterministic, then we should skip the
>>> check if we receive an x-lang transform (seen in the Python SDK as a
>>> RunnerAPITransformHolder).
>>>
>>>
>>>>
>>>> I thought this question was about what to do if you want to take the
>>>> output of an XLang pipeline and process it through some generic transform
>>>> that doesn't care about the types and treats it like an opaque blob (like
>>>> the Count transform) and how to make that work when you don't know the
>>>> output properties. I don't think anyone has shared a design doc for this
>>>> problem that covered the different approaches.
>>>>
>>> Aside from the DataflowRunner GBK problem, I was also curious if there
>>> was any need for metadata around the Coder proto and why there currently is
>>> no metadata. If there was more metadata, like an is_deterministic field,
>>> then the GBK deterministic input check could also work.
>>>
>>>
>>>
>>>>
>>>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>>>> CombineGlobally implementation that takes a KV with a Void type (with
>>>>> VoidCoder) [2] as input.
>>>>>
>>>>> ExternalCoder was added to Python SDK to represent coders within
>>>>> external transforms that are not standard coders (in this case the
>>>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>>>> graph -> Dataflow job request" conversion.
>>>>>
>>>>> Seems like today, a runner is unable to perform this particular
>>>>> validation (and maybe others ?) for pipeline segments received through a
>>>>> cross-language transform expansion with or without the ExternalCoder. Note
>>>>> that a runner is not involved during cross-language transform expansion, so
>>>>> pipeline submission is the only location where a runner would get a chance
>>>>> to perform this kind of validation for cross-language transforms.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>>>
>>>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Since combine globally is a case where you don't need to know what
>>>>>> the key or value is and could treat them as bytes allowing you to build and
>>>>>> execute this pipeline (assuming you ignored properties such as
>>>>>> is_deterministic).
>>>>>>
>>>>>> Regardless, I still think it makes sense to provide criteria on what
>>>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>>>> be defined to support such a case. Your suggested solution of adding
>>>>>> properties to coders is one possible solution but I think we have to take a
>>>>>> step back and consider xlang as a whole since there are still several yet
>>>>>> to be solved issues within it.
>>>>>>
>>>>>>
>>>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:
>>>>>>
>>>>>>> I have a PR that makes GBK a primitive in which the
>>>>>>> test_combine_globally
>>>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>>>>> over the transform in the run_pipeline method. I moved a method that
>>>>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>>>>
>>>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it
>>>>>>>> into a downstream transform that enforces is_deterministic. My
>>>>>>>> understanding of ExternalCoder (admittedly just based on a quick look at
>>>>>>>> commit history) is that it's a shim added so the Python SDK can handle
>>>>>>>> coders that are internal to cross-language transforms.
>>>>>>>> I think that if the Python SDK is trying to introspect an
>>>>>>>> ExternalCoder instance then something is wrong.
>>>>>>>>
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I see. The problem is that you are trying to know certain
>>>>>>>>> properties of the coder to use in a downstream transform which enforces
>>>>>>>>> that it is deterministic like GroupByKey.
>>>>>>>>>
>>>>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>>>>> SDKs to understand the coder, how are you having a cross language pipeline
>>>>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>>>>
>>>>>>>>> Also, an alternative strategy would be to tell the expansion
>>>>>>>>> service that you need to choose a coder that is deterministic on the
>>>>>>>>> output. This would require building the pipeline and before submission to
>>>>>>>>> the job server perform the expansion telling it all the limitations that
>>>>>>>>> the SDK has imposed on it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>>>>>>> from the expansion service.
>>>>>>>>>>
>>>>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>>>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>>>>>>> ExternalCoder defaults to False.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Sam
>>>>>>>>>>
>>>>>>>>>>

Re: More metadata in Coder Proto

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, May 20, 2020 at 11:09 AM Sam Rohde <sr...@google.com> wrote:

> +Robert Bradshaw <ro...@google.com> who is the reviewer on
> https://github.com/apache/beam/pull/11503. How does that sound to you?
> Skip the "is input deterministic" check for GBKs embedded in x-lang
> transforms?
>

Yes, I think this is the right situation in this case. Longer-term, we may
want to handle cases like

[java produces KVs]
[python performs GBK]
[java consumes GBK results]

where properties like this may need to be exposed, but this may also be
ruled out by rejecting "unknown" coders at the boundaries (rather than ones
that are entirely internal).


> On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sr...@google.com> wrote:
>
>> Thanks for your comments, here's a little more to the problem I'm working
>> on: I have a PR to make GBK a primitive
>> <https://github.com/apache/beam/pull/11503> and the aforementioned
>> test_combine_globally was check failing in the run_pipeline method of the
>> DataflowRunner.
>> Specifically what is failing is when the DataflowRunner visits each
>> transform, it checks if the GBK has a deterministic input coder. This fails
>> when the GBK is expanded from the expansion service because the resulting
>> ExternalCoder doesn't override the is_deterministic method.
>>
>> This wasn't being hit before because this deterministic input check only
>> occurred during the apply_GroupByKey method. However, I moved it to when
>> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
>> stage.
>>
>>
>> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> If the CombineGlobally is being returned by the expansion service, the
>>> expansion service is on the hook for ensuring that intermediate
>>> PCollections/PTransforms/... are constructed correctly.
>>>
>> Okay, this was kind of my hunch. If the DataflowRunner is making sure
>> that the input coder to a GBK is deterministic, then we should skip the
>> check if we receive an x-lang transform (seen in the Python SDK as a
>> RunnerAPITransformHolder).
>>
>>
>>>
>>> I thought this question was about what to do if you want to take the
>>> output of an XLang pipeline and process it through some generic transform
>>> that doesn't care about the types and treats it like an opaque blob (like
>>> the Count transform) and how to make that work when you don't know the
>>> output properties. I don't think anyone has shared a design doc for this
>>> problem that covered the different approaches.
>>>
>> Aside from the DataflowRunner GBK problem, I was also curious if there
>> was any need for metadata around the Coder proto and why there currently is
>> no metadata. If there was more metadata, like an is_deterministic field,
>> then the GBK deterministic input check could also work.
>>
>>
>>
>>>
>>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>>> CombineGlobally implementation that takes a KV with a Void type (with
>>>> VoidCoder) [2] as input.
>>>>
>>>> ExternalCoder was added to Python SDK to represent coders within
>>>> external transforms that are not standard coders (in this case the
>>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>>> graph -> Dataflow job request" conversion.
>>>>
>>>> Seems like today, a runner is unable to perform this particular
>>>> validation (and maybe others ?) for pipeline segments received through a
>>>> cross-language transform expansion with or without the ExternalCoder. Note
>>>> that a runner is not involved during cross-language transform expansion, so
>>>> pipeline submission is the only location where a runner would get a chance
>>>> to perform this kind of validation for cross-language transforms.
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>>> [2]
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>>
>>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Since combine globally is a case where you don't need to know what the
>>>>> key or value is and could treat them as bytes allowing you to build and
>>>>> execute this pipeline (assuming you ignored properties such as
>>>>> is_deterministic).
>>>>>
>>>>> Regardless, I still think it makes sense to provide criteria on what
>>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>>> be defined to support such a case. Your suggested solution of adding
>>>>> properties to coders is one possible solution but I think we have to take a
>>>>> step back and consider xlang as a whole since there are still several yet
>>>>> to be solved issues within it.
>>>>>
>>>>>
>>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:
>>>>>
>>>>>> I have a PR that makes GBK a primitive in which the
>>>>>> test_combine_globally
>>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>>>> over the transform in the run_pipeline method. I moved a method that
>>>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>>>
>>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into
>>>>>>> a downstream transform that enforces is_deterministic. My understanding of
>>>>>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>>>>>> that it's a shim added so the Python SDK can handle coders that are
>>>>>>> internal to cross-language transforms.
>>>>>>> I think that if the Python SDK is trying to introspect an
>>>>>>> ExternalCoder instance then something is wrong.
>>>>>>>
>>>>>>> Brian
>>>>>>>
>>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>
>>>>>>>> I see. The problem is that you are trying to know certain
>>>>>>>> properties of the coder to use in a downstream transform which enforces
>>>>>>>> that it is deterministic like GroupByKey.
>>>>>>>>
>>>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>>>> SDKs to understand the coder, how are you having a cross language pipeline
>>>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>>>
>>>>>>>> Also, an alternative strategy would be to tell the expansion
>>>>>>>> service that you need to choose a coder that is deterministic on the
>>>>>>>> output. This would require building the pipeline and before submission to
>>>>>>>> the job server perform the expansion telling it all the limitations that
>>>>>>>> the SDK has imposed on it.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>>>>>> from the expansion service.
>>>>>>>>>
>>>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>>>>>> ExternalCoder defaults to False.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Sam
>>>>>>>>>
>>>>>>>>>

Re: More metadata in Coder Proto

Posted by Sam Rohde <sr...@google.com>.
+Robert Bradshaw <ro...@google.com> who is the reviewer on
https://github.com/apache/beam/pull/11503. How does that sound to you? Skip
the "is input deterministic" check for GBKs embedded in x-lang transforms?

On Wed, May 20, 2020 at 10:56 AM Sam Rohde <sr...@google.com> wrote:

> Thanks for your comments, here's a little more to the problem I'm working
> on: I have a PR to make GBK a primitive
> <https://github.com/apache/beam/pull/11503> and the aforementioned
> test_combine_globally was check failing in the run_pipeline method of the
> DataflowRunner.
> Specifically what is failing is when the DataflowRunner visits each
> transform, it checks if the GBK has a deterministic input coder. This fails
> when the GBK is expanded from the expansion service because the resulting
> ExternalCoder doesn't override the is_deterministic method.
>
> This wasn't being hit before because this deterministic input check only
> occurred during the apply_GroupByKey method. However, I moved it to when
> the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
> stage.
>
>
> On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:
>
>> If the CombineGlobally is being returned by the expansion service, the
>> expansion service is on the hook for ensuring that intermediate
>> PCollections/PTransforms/... are constructed correctly.
>>
> Okay, this was kind of my hunch. If the DataflowRunner is making sure that
> the input coder to a GBK is deterministic, then we should skip the check if
> we receive an x-lang transform (seen in the Python SDK as a
> RunnerAPITransformHolder).
>
>
>>
>> I thought this question was about what to do if you want to take the
>> output of an XLang pipeline and process it through some generic transform
>> that doesn't care about the types and treats it like an opaque blob (like
>> the Count transform) and how to make that work when you don't know the
>> output properties. I don't think anyone has shared a design doc for this
>> problem that covered the different approaches.
>>
> Aside from the DataflowRunner GBK problem, I was also curious if there was
> any need for metadata around the Coder proto and why there currently is no
> metadata. If there was more metadata, like an is_deterministic field, then
> the GBK deterministic input check could also work.
>
>
>
>>
>> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> I think you are hitting GroupByKey [1] that is internal to the Java
>>> CombineGlobally implementation that takes a KV with a Void type (with
>>> VoidCoder) [2] as input.
>>>
>>> ExternalCoder was added to Python SDK to represent coders within
>>> external transforms that are not standard coders (in this case the
>>> VoidCoder). This is needed to perform the "pipeline proto -> Python object
>>> graph -> Dataflow job request" conversion.
>>>
>>> Seems like today, a runner is unable to perform this particular
>>> validation (and maybe others ?) for pipeline segments received through a
>>> cross-language transform expansion with or without the ExternalCoder. Note
>>> that a runner is not involved during cross-language transform expansion, so
>>> pipeline submission is the only location where a runner would get a chance
>>> to perform this kind of validation for cross-language transforms.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>>
>>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Since combine globally is a case where you don't need to know what the
>>>> key or value is and could treat them as bytes allowing you to build and
>>>> execute this pipeline (assuming you ignored properties such as
>>>> is_deterministic).
>>>>
>>>> Regardless, I still think it makes sense to provide criteria on what
>>>> your output shape must be during xlang pipeline expansion which is yet to
>>>> be defined to support such a case. Your suggested solution of adding
>>>> properties to coders is one possible solution but I think we have to take a
>>>> step back and consider xlang as a whole since there are still several yet
>>>> to be solved issues within it.
>>>>
>>>>
>>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:
>>>>
>>>>> I have a PR that makes GBK a primitive in which the
>>>>> test_combine_globally
>>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>>> over the transform in the run_pipeline method. I moved a method that
>>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>>
>>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into
>>>>>> a downstream transform that enforces is_deterministic. My understanding of
>>>>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>>>>> that it's a shim added so the Python SDK can handle coders that are
>>>>>> internal to cross-language transforms.
>>>>>> I think that if the Python SDK is trying to introspect an
>>>>>> ExternalCoder instance then something is wrong.
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> I see. The problem is that you are trying to know certain properties
>>>>>>> of the coder to use in a downstream transform which enforces that it is
>>>>>>> deterministic like GroupByKey.
>>>>>>>
>>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>>> SDKs to understand the coder, how are you having a cross language pipeline
>>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>>
>>>>>>> Also, an alternative strategy would be to tell the expansion service
>>>>>>> that you need to choose a coder that is deterministic on the output. This
>>>>>>> would require building the pipeline and before submission to the job server
>>>>>>> perform the expansion telling it all the limitations that the SDK has
>>>>>>> imposed on it.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>>>>> from the expansion service.
>>>>>>>>
>>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>>>>> ExternalCoder defaults to False.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Sam
>>>>>>>>
>>>>>>>>

Re: More metadata in Coder Proto

Posted by Sam Rohde <sr...@google.com>.
Thanks for your comments, here's a little more to the problem I'm working
on: I have a PR to make GBK a primitive
<https://github.com/apache/beam/pull/11503> and the aforementioned
test_combine_globally was check failing in the run_pipeline method of the
DataflowRunner.
Specifically what is failing is when the DataflowRunner visits each
transform, it checks if the GBK has a deterministic input coder. This fails
when the GBK is expanded from the expansion service because the resulting
ExternalCoder doesn't override the is_deterministic method.

This wasn't being hit before because this deterministic input check only
occurred during the apply_GroupByKey method. However, I moved it to when
the DataflowRunner is creating a V1B3 pipeline during the run_pipeline
stage.


On Wed, May 20, 2020 at 10:13 AM Luke Cwik <lc...@google.com> wrote:

> If the CombineGlobally is being returned by the expansion service, the
> expansion service is on the hook for ensuring that intermediate
> PCollections/PTransforms/... are constructed correctly.
>
Okay, this was kind of my hunch. If the DataflowRunner is making sure that
the input coder to a GBK is deterministic, then we should skip the check if
we receive an x-lang transform (seen in the Python SDK as a
RunnerAPITransformHolder).


>
> I thought this question was about what to do if you want to take the
> output of an XLang pipeline and process it through some generic transform
> that doesn't care about the types and treats it like an opaque blob (like
> the Count transform) and how to make that work when you don't know the
> output properties. I don't think anyone has shared a design doc for this
> problem that covered the different approaches.
>
Aside from the DataflowRunner GBK problem, I was also curious if there was
any need for metadata around the Coder proto and why there currently is no
metadata. If there was more metadata, like an is_deterministic field, then
the GBK deterministic input check could also work.



>
> On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> I think you are hitting GroupByKey [1] that is internal to the Java
>> CombineGlobally implementation that takes a KV with a Void type (with
>> VoidCoder) [2] as input.
>>
>> ExternalCoder was added to Python SDK to represent coders within external
>> transforms that are not standard coders (in this case the VoidCoder). This
>> is needed to perform the "pipeline proto -> Python object graph -> Dataflow
>> job request" conversion.
>>
>> Seems like today, a runner is unable to perform this particular
>> validation (and maybe others ?) for pipeline segments received through a
>> cross-language transform expansion with or without the ExternalCoder. Note
>> that a runner is not involved during cross-language transform expansion, so
>> pipeline submission is the only location where a runner would get a chance
>> to perform this kind of validation for cross-language transforms.
>>
>> [1]
>> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>>
>> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> Since combine globally is a case where you don't need to know what the
>>> key or value is and could treat them as bytes allowing you to build and
>>> execute this pipeline (assuming you ignored properties such as
>>> is_deterministic).
>>>
>>> Regardless, I still think it makes sense to provide criteria on what
>>> your output shape must be during xlang pipeline expansion which is yet to
>>> be defined to support such a case. Your suggested solution of adding
>>> properties to coders is one possible solution but I think we have to take a
>>> step back and consider xlang as a whole since there are still several yet
>>> to be solved issues within it.
>>>
>>>
>>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:
>>>
>>>> I have a PR that makes GBK a primitive in which the
>>>> test_combine_globally
>>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>>> over the transform in the run_pipeline method. I moved a method that
>>>> verifies that coders as inputs to GBKs are deterministic during this
>>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>>
>>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com>
>>>> wrote:
>>>>
>>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
>>>>> downstream transform that enforces is_deterministic. My understanding of
>>>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>>>> that it's a shim added so the Python SDK can handle coders that are
>>>>> internal to cross-language transforms.
>>>>> I think that if the Python SDK is trying to introspect an
>>>>> ExternalCoder instance then something is wrong.
>>>>>
>>>>> Brian
>>>>>
>>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I see. The problem is that you are trying to know certain properties
>>>>>> of the coder to use in a downstream transform which enforces that it is
>>>>>> deterministic like GroupByKey.
>>>>>>
>>>>>> In all the scenarios so far that I have seen we have required both
>>>>>> SDKs to understand the coder, how are you having a cross language pipeline
>>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>>
>>>>>> Also, an alternative strategy would be to tell the expansion service
>>>>>> that you need to choose a coder that is deterministic on the output. This
>>>>>> would require building the pipeline and before submission to the job server
>>>>>> perform the expansion telling it all the limitations that the SDK has
>>>>>> imposed on it.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Should there be more metadata in the Coder Proto? For example,
>>>>>>> adding an "is_deterministic" boolean field. This will allow for a
>>>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>>>> from the expansion service.
>>>>>>>
>>>>>>> My motivation for this is that I recently ran into a problem in
>>>>>>> which an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>>>> ExternalCoder defaults to False.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Sam
>>>>>>>
>>>>>>>

Re: More metadata in Coder Proto

Posted by Luke Cwik <lc...@google.com>.
If the CombineGlobally is being returned by the expansion service, the
expansion service is on the hook for ensuring that intermediate
PCollections/PTransforms/... are constructed correctly.

I thought this question was about what to do if you want to take the output
of an XLang pipeline and process it through some generic transform that
doesn't care about the types and treats it like an opaque blob (like the
Count transform) and how to make that work when you don't know the output
properties. I don't think anyone has shared a design doc for this problem
that covered the different approaches.

On Tue, May 19, 2020 at 9:47 PM Chamikara Jayalath <ch...@google.com>
wrote:

> I think you are hitting GroupByKey [1] that is internal to the Java
> CombineGlobally implementation that takes a KV with a Void type (with
> VoidCoder) [2] as input.
>
> ExternalCoder was added to Python SDK to represent coders within external
> transforms that are not standard coders (in this case the VoidCoder). This
> is needed to perform the "pipeline proto -> Python object graph -> Dataflow
> job request" conversion.
>
> Seems like today, a runner is unable to perform this particular validation
> (and maybe others ?) for pipeline segments received through a
> cross-language transform expansion with or without the ExternalCoder. Note
> that a runner is not involved during cross-language transform expansion, so
> pipeline submission is the only location where a runner would get a chance
> to perform this kind of validation for cross-language transforms.
>
> [1]
> https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172
>
> On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:
>
>> Since combine globally is a case where you don't need to know what the
>> key or value is and could treat them as bytes allowing you to build and
>> execute this pipeline (assuming you ignored properties such as
>> is_deterministic).
>>
>> Regardless, I still think it makes sense to provide criteria on what your
>> output shape must be during xlang pipeline expansion which is yet to be
>> defined to support such a case. Your suggested solution of adding
>> properties to coders is one possible solution but I think we have to take a
>> step back and consider xlang as a whole since there are still several yet
>> to be solved issues within it.
>>
>>
>> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:
>>
>>> I have a PR that makes GBK a primitive in which the
>>> test_combine_globally
>>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>>> over the transform in the run_pipeline method. I moved a method that
>>> verifies that coders as inputs to GBKs are deterministic during this
>>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>>
>>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com>
>>> wrote:
>>>
>>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
>>>> downstream transform that enforces is_deterministic. My understanding of
>>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>>> that it's a shim added so the Python SDK can handle coders that are
>>>> internal to cross-language transforms.
>>>> I think that if the Python SDK is trying to introspect an ExternalCoder
>>>> instance then something is wrong.
>>>>
>>>> Brian
>>>>
>>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> I see. The problem is that you are trying to know certain properties
>>>>> of the coder to use in a downstream transform which enforces that it is
>>>>> deterministic like GroupByKey.
>>>>>
>>>>> In all the scenarios so far that I have seen we have required both
>>>>> SDKs to understand the coder, how are you having a cross language pipeline
>>>>> where the downstream SDK doesn't understand the coder and works?
>>>>>
>>>>> Also, an alternative strategy would be to tell the expansion service
>>>>> that you need to choose a coder that is deterministic on the output. This
>>>>> would require building the pipeline and before submission to the job server
>>>>> perform the expansion telling it all the limitations that the SDK has
>>>>> imposed on it.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Should there be more metadata in the Coder Proto? For example, adding
>>>>>> an "is_deterministic" boolean field. This will allow for a
>>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>>> from the expansion service.
>>>>>>
>>>>>> My motivation for this is that I recently ran into a problem in which
>>>>>> an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>>> ExternalCoder defaults to False.
>>>>>>
>>>>>> Regards,
>>>>>> Sam
>>>>>>
>>>>>>

Re: More metadata in Coder Proto

Posted by Chamikara Jayalath <ch...@google.com>.
I think you are hitting GroupByKey [1] that is internal to the Java
CombineGlobally implementation that takes a KV with a Void type (with
VoidCoder) [2] as input.

ExternalCoder was added to Python SDK to represent coders within external
transforms that are not standard coders (in this case the VoidCoder). This
is needed to perform the "pipeline proto -> Python object graph -> Dataflow
job request" conversion.

Seems like today, a runner is unable to perform this particular validation
(and maybe others ?) for pipeline segments received through a
cross-language transform expansion with or without the ExternalCoder. Note
that a runner is not involved during cross-language transform expansion, so
pipeline submission is the only location where a runner would get a chance
to perform this kind of validation for cross-language transforms.

[1]
https://github.com/apache/beam/blob/2967e3ae513a9bdb13c2da8ffa306fdc092370f0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1596
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1172

On Tue, May 19, 2020 at 8:31 PM Luke Cwik <lc...@google.com> wrote:

> Since combine globally is a case where you don't need to know what the key
> or value is and could treat them as bytes allowing you to build and execute
> this pipeline (assuming you ignored properties such as is_deterministic).
>
> Regardless, I still think it makes sense to provide criteria on what your
> output shape must be during xlang pipeline expansion which is yet to be
> defined to support such a case. Your suggested solution of adding
> properties to coders is one possible solution but I think we have to take a
> step back and consider xlang as a whole since there are still several yet
> to be solved issues within it.
>
>
> On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:
>
>> I have a PR that makes GBK a primitive in which the test_combine_globally
>> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
>> is failing on the DataflowRunner. In particular, the DataflowRunner runs
>> over the transform in the run_pipeline method. I moved a method that
>> verifies that coders as inputs to GBKs are deterministic during this
>> run_pipeline. Previously, this was during the apply_GroupByKey.
>>
>> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com>
>> wrote:
>>
>>> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
>>> downstream transform that enforces is_deterministic. My understanding of
>>> ExternalCoder (admittedly just based on a quick look at commit history) is
>>> that it's a shim added so the Python SDK can handle coders that are
>>> internal to cross-language transforms.
>>> I think that if the Python SDK is trying to introspect an ExternalCoder
>>> instance then something is wrong.
>>>
>>> Brian
>>>
>>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> I see. The problem is that you are trying to know certain properties of
>>>> the coder to use in a downstream transform which enforces that it is
>>>> deterministic like GroupByKey.
>>>>
>>>> In all the scenarios so far that I have seen we have required both SDKs
>>>> to understand the coder, how are you having a cross language pipeline where
>>>> the downstream SDK doesn't understand the coder and works?
>>>>
>>>> Also, an alternative strategy would be to tell the expansion service
>>>> that you need to choose a coder that is deterministic on the output. This
>>>> would require building the pipeline and before submission to the job server
>>>> perform the expansion telling it all the limitations that the SDK has
>>>> imposed on it.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Should there be more metadata in the Coder Proto? For example, adding
>>>>> an "is_deterministic" boolean field. This will allow for a
>>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>>> from the expansion service.
>>>>>
>>>>> My motivation for this is that I recently ran into a problem in which
>>>>> an "ExternalCoder" in the Python SDK was erroneously marked as
>>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>>> ExternalCoder defaults to False.
>>>>>
>>>>> Regards,
>>>>> Sam
>>>>>
>>>>>

Re: More metadata in Coder Proto

Posted by Luke Cwik <lc...@google.com>.
Since combine globally is a case where you don't need to know what the key
or value is and could treat them as bytes allowing you to build and execute
this pipeline (assuming you ignored properties such as is_deterministic).

Regardless, I still think it makes sense to provide criteria on what your
output shape must be during xlang pipeline expansion which is yet to be
defined to support such a case. Your suggested solution of adding
properties to coders is one possible solution but I think we have to take a
step back and consider xlang as a whole since there are still several yet
to be solved issues within it.


On Tue, May 19, 2020 at 4:56 PM Sam Rohde <sr...@google.com> wrote:

> I have a PR that makes GBK a primitive in which the test_combine_globally
> <https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
> is failing on the DataflowRunner. In particular, the DataflowRunner runs
> over the transform in the run_pipeline method. I moved a method that
> verifies that coders as inputs to GBKs are deterministic during this
> run_pipeline. Previously, this was during the apply_GroupByKey.
>
> On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com> wrote:
>
>> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
>> downstream transform that enforces is_deterministic. My understanding of
>> ExternalCoder (admittedly just based on a quick look at commit history) is
>> that it's a shim added so the Python SDK can handle coders that are
>> internal to cross-language transforms.
>> I think that if the Python SDK is trying to introspect an ExternalCoder
>> instance then something is wrong.
>>
>> Brian
>>
>> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> I see. The problem is that you are trying to know certain properties of
>>> the coder to use in a downstream transform which enforces that it is
>>> deterministic like GroupByKey.
>>>
>>> In all the scenarios so far that I have seen we have required both SDKs
>>> to understand the coder, how are you having a cross language pipeline where
>>> the downstream SDK doesn't understand the coder and works?
>>>
>>> Also, an alternative strategy would be to tell the expansion service
>>> that you need to choose a coder that is deterministic on the output. This
>>> would require building the pipeline and before submission to the job server
>>> perform the expansion telling it all the limitations that the SDK has
>>> imposed on it.
>>>
>>>
>>>
>>>
>>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Should there be more metadata in the Coder Proto? For example, adding
>>>> an "is_deterministic" boolean field. This will allow for a
>>>> language-agnostic way for SDKs to infer properties about a coder received
>>>> from the expansion service.
>>>>
>>>> My motivation for this is that I recently ran into a problem in which
>>>> an "ExternalCoder" in the Python SDK was erroneously marked as
>>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>>> ExternalCoder defaults to False.
>>>>
>>>> Regards,
>>>> Sam
>>>>
>>>>

Re: More metadata in Coder Proto

Posted by Sam Rohde <sr...@google.com>.
I have a PR that makes GBK a primitive in which the test_combine_globally
<https://github.com/apache/beam/blob/10dc1bb683aa9c219397cb3474b676a4fbac5a0e/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py#L162>
is failing on the DataflowRunner. In particular, the DataflowRunner runs
over the transform in the run_pipeline method. I moved a method that
verifies that coders as inputs to GBKs are deterministic during this
run_pipeline. Previously, this was during the apply_GroupByKey.

On Tue, May 19, 2020 at 4:48 PM Brian Hulette <bh...@google.com> wrote:

> Yes I'm unclear on how a PCollection with ExternalCoder made it into a
> downstream transform that enforces is_deterministic. My understanding of
> ExternalCoder (admittedly just based on a quick look at commit history) is
> that it's a shim added so the Python SDK can handle coders that are
> internal to cross-language transforms.
> I think that if the Python SDK is trying to introspect an ExternalCoder
> instance then something is wrong.
>
> Brian
>
> On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:
>
>> I see. The problem is that you are trying to know certain properties of
>> the coder to use in a downstream transform which enforces that it is
>> deterministic like GroupByKey.
>>
>> In all the scenarios so far that I have seen we have required both SDKs
>> to understand the coder, how are you having a cross language pipeline where
>> the downstream SDK doesn't understand the coder and works?
>>
>> Also, an alternative strategy would be to tell the expansion service that
>> you need to choose a coder that is deterministic on the output. This would
>> require building the pipeline and before submission to the job server
>> perform the expansion telling it all the limitations that the SDK has
>> imposed on it.
>>
>>
>>
>>
>> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:
>>
>>> Hi all,
>>>
>>> Should there be more metadata in the Coder Proto? For example, adding an
>>> "is_deterministic" boolean field. This will allow for a language-agnostic
>>> way for SDKs to infer properties about a coder received from the expansion
>>> service.
>>>
>>> My motivation for this is that I recently ran into a problem in which an
>>> "ExternalCoder" in the Python SDK was erroneously marked as
>>> non-deterministic. The reason being is that the Coder proto doesn't have an
>>> "is_deterministic" and when the coder fails to be recreated in Python, the
>>> ExternalCoder defaults to False.
>>>
>>> Regards,
>>> Sam
>>>
>>>

Re: More metadata in Coder Proto

Posted by Brian Hulette <bh...@google.com>.
Yes I'm unclear on how a PCollection with ExternalCoder made it into a
downstream transform that enforces is_deterministic. My understanding of
ExternalCoder (admittedly just based on a quick look at commit history) is
that it's a shim added so the Python SDK can handle coders that are
internal to cross-language transforms.
I think that if the Python SDK is trying to introspect an ExternalCoder
instance then something is wrong.

Brian

On Tue, May 19, 2020 at 4:01 PM Luke Cwik <lc...@google.com> wrote:

> I see. The problem is that you are trying to know certain properties of
> the coder to use in a downstream transform which enforces that it is
> deterministic like GroupByKey.
>
> In all the scenarios so far that I have seen we have required both SDKs to
> understand the coder, how are you having a cross language pipeline where
> the downstream SDK doesn't understand the coder and works?
>
> Also, an alternative strategy would be to tell the expansion service that
> you need to choose a coder that is deterministic on the output. This would
> require building the pipeline and before submission to the job server
> perform the expansion telling it all the limitations that the SDK has
> imposed on it.
>
>
>
>
> On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:
>
>> Hi all,
>>
>> Should there be more metadata in the Coder Proto? For example, adding an
>> "is_deterministic" boolean field. This will allow for a language-agnostic
>> way for SDKs to infer properties about a coder received from the expansion
>> service.
>>
>> My motivation for this is that I recently ran into a problem in which an
>> "ExternalCoder" in the Python SDK was erroneously marked as
>> non-deterministic. The reason being is that the Coder proto doesn't have an
>> "is_deterministic" and when the coder fails to be recreated in Python, the
>> ExternalCoder defaults to False.
>>
>> Regards,
>> Sam
>>
>>

Re: More metadata in Coder Proto

Posted by Luke Cwik <lc...@google.com>.
I see. The problem is that you are trying to know certain properties of the
coder to use in a downstream transform which enforces that it is
deterministic like GroupByKey.

In all the scenarios so far that I have seen we have required both SDKs to
understand the coder, how are you having a cross language pipeline where
the downstream SDK doesn't understand the coder and works?

Also, an alternative strategy would be to tell the expansion service that
you need to choose a coder that is deterministic on the output. This would
require building the pipeline and before submission to the job server
perform the expansion telling it all the limitations that the SDK has
imposed on it.




On Tue, May 19, 2020 at 3:45 PM Sam Rohde <sr...@google.com> wrote:

> Hi all,
>
> Should there be more metadata in the Coder Proto? For example, adding an
> "is_deterministic" boolean field. This will allow for a language-agnostic
> way for SDKs to infer properties about a coder received from the expansion
> service.
>
> My motivation for this is that I recently ran into a problem in which an
> "ExternalCoder" in the Python SDK was erroneously marked as
> non-deterministic. The reason being is that the Coder proto doesn't have an
> "is_deterministic" and when the coder fails to be recreated in Python, the
> ExternalCoder defaults to False.
>
> Regards,
> Sam
>
>