You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ke Wu <ke...@gmail.com> on 2021/10/04 18:18:43 UTC

Reshuffle Discrepancy in Classic vs Portable Pipeline

Hello All, 

Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].

This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:

1. Is such SDK/API discrepancy expected?
2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
3. If No, is this something we can improve?

Best,
Ke


[1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ <https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/> 
[2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ <https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/> 
[3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730> 

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Bradshaw <ro...@google.com>.
This is definitely a bug. I'm surprised we haven't run into this
before. (I suppose per-key shuffling is a valid, if possibly
inefficient, replacement for the keyless variant.) I would be in favor
of making two new, unambiguous URNs and deprecating the existing one
that has multiple interpretations.

On Mon, Oct 4, 2021 at 2:59 PM Ke Wu <ke...@gmail.com> wrote:
>
> Thanks for the confirmation.
>
> For translation purpose, I think the issue is that
>
> “beam:transform:reshuffle:v1” corresponds to Java Reshuffle.of() and Python Reshuffle(), where one is expecting KV but not the other.
>
>
> Ideally, it should be [Java Reshuffle.of() and Python ReshufflePerKey()] or [Java Reshuffle.viaRandomKey() and Python Reshuffle()]. In addition, there could be another Urn to represent the other pair. e.g. "beam:transform:reshuffle_per_key:v1” or “beam:transform:reshuffle_via_random_key:v1"
>
> Any thoughts on this?
>
> Best,
> Ke
>
>
> On Oct 4, 2021, at 2:43 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> Oh, yes.
>
> Java Reshuffle.of() = Python ReshufflePerKey()
> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>
> We generally try to avoid this kind of discrepancy. It could make
> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>
> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>
>
> I should have said that the descrepency lives in SDK not Class vs Portable.
>
> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>
>
> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>
> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> Reshuffle is not keyed, there is a separate reshuffle-per-key for
> that. This is true for both Java and Python. This shouldn't depend on
> classic vs. portable mode. It sounds like there's an issue in
> translation.
>
> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>
>
> Hello All,
>
> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>
> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>
> 1. Is such SDK/API discrepancy expected?
> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
> 3. If No, is this something we can improve?
>
> Best,
> Ke
>
>
> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>
>
>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Ke Wu <ke...@gmail.com>.
Thanks for the confirmation.

For translation purpose, I think the issue is that

“beam:transform:reshuffle:v1” corresponds to Java Reshuffle.of() and Python Reshuffle(), where one is expecting KV but not the other. 

Ideally, it should be [Java Reshuffle.of() and Python ReshufflePerKey()] or [Java Reshuffle.viaRandomKey() and Python Reshuffle()]. In addition, there could be another Urn to represent the other pair. e.g. "beam:transform:reshuffle_per_key:v1” or “beam:transform:reshuffle_via_random_key:v1"

Any thoughts on this?

Best,
Ke


> On Oct 4, 2021, at 2:43 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> Oh, yes.
> 
> Java Reshuffle.of() = Python ReshufflePerKey()
> Java Reshuffle.viaRandomKey() == Python Reshuffle()
> 
> We generally try to avoid this kind of discrepancy. It could make
> sense to rename Reshuffle.of() to Reshuffle.viaKey().
> 
> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>> 
>> I should have said that the descrepency lives in SDK not Class vs Portable.
>> 
>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>> 
>> 
>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>> 
>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>> 
>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>> that. This is true for both Java and Python. This shouldn't depend on
>> classic vs. portable mode. It sounds like there's an issue in
>> translation.
>> 
>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>> 
>> 
>> Hello All,
>> 
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>> 
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>> 
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>> 3. If No, is this something we can improve?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> 
>> 


Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lc...@google.com> wrote:
>
> I believe the intent was for the existing reshuffle URN to represent the keyed variant. This was since the unkeyed reshuffle was a composite built on top of the keyed reshuffle in the Java SDK. The existing overrides in Flink/Spark/Samza confirm this.

And from the Python side I thought the intent was for the reshuffle
URN to represent the unkeyed variant, as the keyed one isn't anything
novel--essentially GBK followed by a trivial FlatMap--as evidenced by
the code :). The composite implementation of doing this with random
keys was just an implementation detail for runners that didn't have a
more efficient variant.

> Thinking about this more I wish we had went only with the unkeyed variant as I don't know how much benefit users get from having their records grouped by the key they choose and it also limits the optimization capabilities of the runner a lot as to how to materialize the data.

+1

> The only usage of of the keyed Reshuffle in the Java SDK is for write files with a single key and the use case there would benefit from being replaced with GroupIntoBatches instead.
>
>
> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <ro...@frantil.com> wrote:
>>
>> I can handle the Go SDK change once the urn is decided. I'm cleaning up a change to add the combine_global urn in the Go SDK so this can slip in along side it.
>>
>> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke...@gmail.com> wrote:
>>>
>>> Created https://issues.apache.org/jira/browse/BEAM-12999
>>>
>>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>>>
>>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke...@gmail.com> wrote:
>>>
>>>
>>> Let me add two new urns representing reshuffle via random key and reshuffle using key. I will share the PR later here, would need some help on Python/Go SDK changes too since I am not very familiar with them.
>>>
>>> Best,
>>> Ke
>>>
>>>
>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>
>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>>
>>> Oh, yes.
>>>
>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>>
>>> We generally try to avoid this kind of discrepancy. It could make
>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>>
>>>
>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>> might be opinionated.
>>>
>>>
>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>> to me than vaiRandomKey(), but probably not worth changing so the
>>> question becomes whether to be stilted or consistent.)
>>>
>>> More importantly - could we undeprecate Reshuffle
>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>>> has undocumented and non-portable side-effects, but is still makes sense
>>> for various use-cases (e.g. fan-out, or SDF).
>>>
>>>
>>> +1
>>>
>>>
>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>>>
>>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>>
>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>>
>>>
>>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>>> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>>>
>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>> that. This is true for both Java and Python. This shouldn't depend on
>>> classic vs. portable mode. It sounds like there's an issue in
>>> translation.
>>>
>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>>>
>>>
>>> Hello All,
>>>
>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>>>
>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>>>
>>> 1. Is such SDK/API discrepancy expected?
>>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>>> 3. If No, is this something we can improve?
>>>
>>> Best,
>>> Ke
>>>
>>>
>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>
>>>
>>>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Ke Wu <ke...@gmail.com>.
@Robert @Luke @Jan, could you help take an early look at https://github.com/apache/beam/pull/15665 <https://github.com/apache/beam/pull/15665> there are some test failures that are to be resolved, but most likely not related to the PR because I observes the same failures across PRs.



> On Oct 6, 2021, at 11:03 AM, Robert Burke <ro...@frantil.com> wrote:
> 
> The GoSDK handles the urn as unkeyed. 
> 
> That is, reshuffling a PCollection<KV> will ignore the keys, and produce a PCollection<KV<int,KV>> with the random keys. This would split user keys up to multiple partitions. This is the same as though it were unkeyed.
> 
> Doing anything with the user key specifically would seem to me to defeat the point of a reshuffle, vs just using a GBK which would align keys to bundles in it's output.
> 
> 
> On Wed, Oct 6, 2021, 10:54 AM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>> The only usage of of the keyed Reshuffle in the Java SDK is for write files with a single key and the use case there would benefit from being replaced with GroupIntoBatches instead.
> 
> I think there are more use cases for keyed reshuffle , e.g. in Samza runner, it is also used when we rekeyed elements, in addition, since states are partitioned by key, so it is important to reshuffle after a PCollection is assigned with a different key so elements with the same new key could end up in the same partition.
> 
>> I believe the intent was for the existing reshuffle URN to represent the keyed variant. This was since the unkeyed reshuffle was a composite built on top of the keyed reshuffle in the Java SDK. The existing overrides in Flink/Spark/Samza confirm this.
> 
> I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are authored in Java, which is expecting keyed Reshuffle<K, V>.
> 
>> I believe the intent was for the existing reshuffle URN to represent the keyed variant.
>> And from the Python side I thought the intent was for the reshuffle
>> URN to represent the unkeyed variant, as the keyed one isn't anything
>> novel
> 
> 
> This is exactly what is confusing, the same urn is currently representing keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK. 
> @Luke do you think it makes since to have two separately Urns representing two different reshuffles? Unkeyed reshuffle is still expected to be a composite transform of keyed transform and runners can decided which (keyed/unkeyd) reshuffle they want to translate.
> 
> Best,
> Ke
> 
>> On Oct 6, 2021, at 10:38 AM, Reuven Lax <relax@google.com <ma...@google.com>> wrote:
>> 
>> I think it's used with the destination as a key, no? In various places Reshuffle is used as a standin for RequiresStableInput
>> 
>> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
>> I believe the intent was for the existing reshuffle URN to represent the keyed variant. This was since the unkeyed reshuffle was a composite built on top of the keyed reshuffle in the Java SDK. The existing overrides in Flink/Spark/Samza confirm this.
>> 
>> Thinking about this more I wish we had went only with the unkeyed variant as I don't know how much benefit users get from having their records grouped by the key they choose and it also limits the optimization capabilities of the runner a lot as to how to materialize the data.
>> 
>> The only usage of of the keyed Reshuffle in the Java SDK is for write files with a single key and the use case there would benefit from being replaced with GroupIntoBatches instead.
>> 
>> 
>> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <robert@frantil.com <ma...@frantil.com>> wrote:
>> I can handle the Go SDK change once the urn is decided. I'm cleaning up a change to add the combine_global urn in the Go SDK so this can slip in along side it.
>> 
>> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>> Created https://issues.apache.org/jira/browse/BEAM-12999 <https://issues.apache.org/jira/browse/BEAM-12999> 
>> 
>>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
>>> 
>>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>>> 
>>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Let me add two new urns representing reshuffle via random key and reshuffle using key. I will share the PR later here, would need some help on Python/Go SDK changes too since I am not very familiar with them.
>>>> 
>>>> Best,
>>>> Ke
>>>> 
>>>> 
>>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
>>>> 
>>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>>> 
>>>> 
>>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>>> 
>>>> Oh, yes.
>>>> 
>>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>>> 
>>>> We generally try to avoid this kind of discrepancy. It could make
>>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>>> 
>>>> 
>>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>>> might be opinionated.
>>>> 
>>>> 
>>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>>> to me than vaiRandomKey(), but probably not worth changing so the
>>>> question becomes whether to be stilted or consistent.)
>>>> 
>>>> More importantly - could we undeprecate Reshuffle
>>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>>>> has undocumented and non-portable side-effects, but is still makes sense
>>>> for various use-cases (e.g. fan-out, or SDF).
>>>> 
>>>> 
>>>> +1
>>>> 
>>>> 
>>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>>> 
>>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>>> 
>>>> 
>>>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53>
>>>> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122>
>>>> 
>>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
>>>> 
>>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>>> that. This is true for both Java and Python. This shouldn't depend on
>>>> classic vs. portable mode. It sounds like there's an issue in
>>>> translation.
>>>> 
>>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> 
>>>> Hello All,
>>>> 
>>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>>>> 
>>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>>>> 
>>>> 1. Is such SDK/API discrepancy expected?
>>>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>>>> 3. If No, is this something we can improve?
>>>> 
>>>> Best,
>>>> Ke
>>>> 
>>>> 
>>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ <https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/>
>>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ <https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/>
>>>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>>> 
>>>> 
>> 
> 


Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Burke <ro...@frantil.com>.
The GoSDK handles the urn as unkeyed.

That is, reshuffling a PCollection<KV> will ignore the keys, and produce a
PCollection<KV<int,KV>> with the random keys. This would split user keys up
to multiple partitions. This is the same as though it were unkeyed.

Doing anything with the user key specifically would seem to me to defeat
the point of a reshuffle, vs just using a GBK which would align keys to
bundles in it's output.


On Wed, Oct 6, 2021, 10:54 AM Ke Wu <ke...@gmail.com> wrote:

> The only usage of of the keyed Reshuffle in the Java SDK is for write
>> files with a single key and the use case there would benefit from being
>> replaced with GroupIntoBatches instead.
>>
>
> I think there are more use cases for keyed reshuffle , e.g. in Samza
> runner, it is also used when we rekeyed elements, in addition, since states
> are partitioned by key, so it is important to reshuffle after a PCollection
> is assigned with a different key so elements with the same new key could
> end up in the same partition.
>
> I believe the intent was for the existing reshuffle URN to represent the
>> keyed variant. This was since the unkeyed reshuffle was a composite built
>> on top of the keyed reshuffle in the Java SDK. The existing overrides in
>> Flink/Spark/Samza confirm this.
>>
>
> I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are
> authored in Java, which is expecting keyed Reshuffle<K, V>.
>
> I believe the intent was for the existing reshuffle URN to represent the
>> keyed variant.
>>
> And from the Python side I thought the intent was for the reshuffle
> URN to represent the unkeyed variant, as the keyed one isn't anything
> novel
>
>
> This is exactly what is confusing, the same urn is currently representing
> keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK.
> @Luke do you think it makes since to have two separately Urns representing
> two different reshuffles? Unkeyed reshuffle is still expected to be a
> composite transform of keyed transform and runners can decided which
> (keyed/unkeyd) reshuffle they want to translate.
>
> Best,
> Ke
>
> On Oct 6, 2021, at 10:38 AM, Reuven Lax <re...@google.com> wrote:
>
> I think it's used with the destination as a key, no? In various places
> Reshuffle is used as a standin for RequiresStableInput
>
> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lc...@google.com> wrote:
>
>> I believe the intent was for the existing reshuffle URN to represent the
>> keyed variant. This was since the unkeyed reshuffle was a composite built
>> on top of the keyed reshuffle in the Java SDK. The existing overrides in
>> Flink/Spark/Samza confirm this.
>>
>> Thinking about this more I wish we had went only with the unkeyed variant
>> as I don't know how much benefit users get from having their records
>> grouped by the key they choose and it also limits the optimization
>> capabilities of the runner a lot as to how to materialize the data.
>>
>> The only usage of of the keyed Reshuffle in the Java SDK is for write
>> files with a single key and the use case there would benefit from being
>> replaced with GroupIntoBatches instead.
>>
>>
>> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <ro...@frantil.com> wrote:
>>
>>> I can handle the Go SDK change once the urn is decided. I'm cleaning up
>>> a change to add the combine_global urn in the Go SDK so this can slip in
>>> along side it.
>>>
>>> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke...@gmail.com> wrote:
>>>
>>>> Created https://issues.apache.org/jira/browse/BEAM-12999
>>>>
>>>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>>>>
>>>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke...@gmail.com> wrote:
>>>>
>>>>
>>>> Let me add two new urns representing reshuffle via random key and
>>>> reshuffle using key. I will share the PR later here, would need some help
>>>> on Python/Go SDK changes too since I am not very familiar with them.
>>>>
>>>> Best,
>>>> Ke
>>>>
>>>>
>>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>
>>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>>>
>>>> Oh, yes.
>>>>
>>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>>>
>>>> We generally try to avoid this kind of discrepancy. It could make
>>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>>>
>>>>
>>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>>> might be opinionated.
>>>>
>>>>
>>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>>> to me than vaiRandomKey(), but probably not worth changing so the
>>>> question becomes whether to be stilted or consistent.)
>>>>
>>>> More importantly - could we undeprecate Reshuffle
>>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>>>> has undocumented and non-portable side-effects, but is still makes sense
>>>> for various use-cases (e.g. fan-out, or SDF).
>>>>
>>>>
>>>> +1
>>>>
>>>>
>>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>>>>
>>>> I should have said that the descrepency lives in SDK not Class vs
>>>> Portable.
>>>>
>>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the
>>>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>>>> [2]
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>> [3]
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>>>>
>>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>>> that. This is true for both Java and Python. This shouldn't depend on
>>>> classic vs. portable mode. It sounds like there's an issue in
>>>> translation.
>>>>
>>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>>>>
>>>>
>>>> Hello All,
>>>>
>>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an
>>>> interesting fact that Reshuffle Transform in classic pipeline requires the
>>>> input to be KV while portable pipeline does not, where Reshuffle in
>>>> portable mode it has an extra step to append a random key [3].
>>>>
>>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to
>>>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of
>>>> questions on this:
>>>>
>>>> 1. Is such SDK/API discrepancy expected?
>>>> 2. If Yes, then, what are the advised approach for runners to implement
>>>> translators for such transforms?
>>>> 3. If No, is this something we can improve?
>>>>
>>>> Best,
>>>> Ke
>>>>
>>>>
>>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>>>> [3]
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>>
>>>>
>>>>
>>>>
>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Ke Wu <ke...@gmail.com>.
> The only usage of of the keyed Reshuffle in the Java SDK is for write files with a single key and the use case there would benefit from being replaced with GroupIntoBatches instead.

I think there are more use cases for keyed reshuffle , e.g. in Samza runner, it is also used when we rekeyed elements, in addition, since states are partitioned by key, so it is important to reshuffle after a PCollection is assigned with a different key so elements with the same new key could end up in the same partition.

> I believe the intent was for the existing reshuffle URN to represent the keyed variant. This was since the unkeyed reshuffle was a composite built on top of the keyed reshuffle in the Java SDK. The existing overrides in Flink/Spark/Samza confirm this.

I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are authored in Java, which is expecting keyed Reshuffle<K, V>.

> I believe the intent was for the existing reshuffle URN to represent the keyed variant.
> And from the Python side I thought the intent was for the reshuffle
> URN to represent the unkeyed variant, as the keyed one isn't anything
> novel


This is exactly what is confusing, the same urn is currently representing keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK. 
@Luke do you think it makes since to have two separately Urns representing two different reshuffles? Unkeyed reshuffle is still expected to be a composite transform of keyed transform and runners can decided which (keyed/unkeyd) reshuffle they want to translate.

Best,
Ke

> On Oct 6, 2021, at 10:38 AM, Reuven Lax <re...@google.com> wrote:
> 
> I think it's used with the destination as a key, no? In various places Reshuffle is used as a standin for RequiresStableInput
> 
> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lcwik@google.com <ma...@google.com>> wrote:
> I believe the intent was for the existing reshuffle URN to represent the keyed variant. This was since the unkeyed reshuffle was a composite built on top of the keyed reshuffle in the Java SDK. The existing overrides in Flink/Spark/Samza confirm this.
> 
> Thinking about this more I wish we had went only with the unkeyed variant as I don't know how much benefit users get from having their records grouped by the key they choose and it also limits the optimization capabilities of the runner a lot as to how to materialize the data.
> 
> The only usage of of the keyed Reshuffle in the Java SDK is for write files with a single key and the use case there would benefit from being replaced with GroupIntoBatches instead.
> 
> 
> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <robert@frantil.com <ma...@frantil.com>> wrote:
> I can handle the Go SDK change once the urn is decided. I'm cleaning up a change to add the combine_global urn in the Go SDK so this can slip in along side it.
> 
> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
> Created https://issues.apache.org/jira/browse/BEAM-12999 <https://issues.apache.org/jira/browse/BEAM-12999> 
> 
>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
>> 
>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>> 
>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Let me add two new urns representing reshuffle via random key and reshuffle using key. I will share the PR later here, would need some help on Python/Go SDK changes too since I am not very familiar with them.
>>> 
>>> Best,
>>> Ke
>>> 
>>> 
>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
>>> 
>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>>> 
>>> 
>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>> 
>>> Oh, yes.
>>> 
>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>> 
>>> We generally try to avoid this kind of discrepancy. It could make
>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>> 
>>> 
>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>> might be opinionated.
>>> 
>>> 
>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>> to me than vaiRandomKey(), but probably not worth changing so the
>>> question becomes whether to be stilted or consistent.)
>>> 
>>> More importantly - could we undeprecate Reshuffle
>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>>> has undocumented and non-portable side-effects, but is still makes sense
>>> for various use-cases (e.g. fan-out, or SDF).
>>> 
>>> 
>>> +1
>>> 
>>> 
>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>> 
>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>> 
>>> 
>>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53>
>>> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122>
>>> 
>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
>>> 
>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>> that. This is true for both Java and Python. This shouldn't depend on
>>> classic vs. portable mode. It sounds like there's an issue in
>>> translation.
>>> 
>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke.wu.cs@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> 
>>> Hello All,
>>> 
>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>>> 
>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>>> 
>>> 1. Is such SDK/API discrepancy expected?
>>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>>> 3. If No, is this something we can improve?
>>> 
>>> Best,
>>> Ke
>>> 
>>> 
>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ <https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/>
>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ <https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/>
>>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>> 
>>> 
> 


Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Reuven Lax <re...@google.com>.
I think it's used with the destination as a key, no? In various places
Reshuffle is used as a standin for RequiresStableInput

On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lc...@google.com> wrote:

> I believe the intent was for the existing reshuffle URN to represent the
> keyed variant. This was since the unkeyed reshuffle was a composite built
> on top of the keyed reshuffle in the Java SDK. The existing overrides in
> Flink/Spark/Samza confirm this.
>
> Thinking about this more I wish we had went only with the unkeyed variant
> as I don't know how much benefit users get from having their records
> grouped by the key they choose and it also limits the optimization
> capabilities of the runner a lot as to how to materialize the data.
>
> The only usage of of the keyed Reshuffle in the Java SDK is for write
> files with a single key and the use case there would benefit from being
> replaced with GroupIntoBatches instead.
>
>
> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <ro...@frantil.com> wrote:
>
>> I can handle the Go SDK change once the urn is decided. I'm cleaning up a
>> change to add the combine_global urn in the Go SDK so this can slip in
>> along side it.
>>
>> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke...@gmail.com> wrote:
>>
>>> Created https://issues.apache.org/jira/browse/BEAM-12999
>>>
>>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>>>
>>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke...@gmail.com> wrote:
>>>
>>>
>>> Let me add two new urns representing reshuffle via random key and
>>> reshuffle using key. I will share the PR later here, would need some help
>>> on Python/Go SDK changes too since I am not very familiar with them.
>>>
>>> Best,
>>> Ke
>>>
>>>
>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>
>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>>
>>> Oh, yes.
>>>
>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>>
>>> We generally try to avoid this kind of discrepancy. It could make
>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>>
>>>
>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>> might be opinionated.
>>>
>>>
>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>> to me than vaiRandomKey(), but probably not worth changing so the
>>> question becomes whether to be stilted or consistent.)
>>>
>>> More importantly - could we undeprecate Reshuffle
>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>>> has undocumented and non-portable side-effects, but is still makes sense
>>> for various use-cases (e.g. fan-out, or SDF).
>>>
>>>
>>> +1
>>>
>>>
>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>>>
>>> I should have said that the descrepency lives in SDK not Class vs
>>> Portable.
>>>
>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the
>>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>> [3]
>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>>>
>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>> that. This is true for both Java and Python. This shouldn't depend on
>>> classic vs. portable mode. It sounds like there's an issue in
>>> translation.
>>>
>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>>>
>>>
>>> Hello All,
>>>
>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an
>>> interesting fact that Reshuffle Transform in classic pipeline requires the
>>> input to be KV while portable pipeline does not, where Reshuffle in
>>> portable mode it has an extra step to append a random key [3].
>>>
>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to
>>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of
>>> questions on this:
>>>
>>> 1. Is such SDK/API discrepancy expected?
>>> 2. If Yes, then, what are the advised approach for runners to implement
>>> translators for such transforms?
>>> 3. If No, is this something we can improve?
>>>
>>> Best,
>>> Ke
>>>
>>>
>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>>> [3]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>
>>>
>>>
>>>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Luke Cwik <lc...@google.com>.
I believe the intent was for the existing reshuffle URN to represent the
keyed variant. This was since the unkeyed reshuffle was a composite built
on top of the keyed reshuffle in the Java SDK. The existing overrides in
Flink/Spark/Samza confirm this.

Thinking about this more I wish we had went only with the unkeyed variant
as I don't know how much benefit users get from having their records
grouped by the key they choose and it also limits the optimization
capabilities of the runner a lot as to how to materialize the data.

The only usage of of the keyed Reshuffle in the Java SDK is for write files
with a single key and the use case there would benefit from being replaced
with GroupIntoBatches instead.


On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <ro...@frantil.com> wrote:

> I can handle the Go SDK change once the urn is decided. I'm cleaning up a
> change to add the combine_global urn in the Go SDK so this can slip in
> along side it.
>
> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke...@gmail.com> wrote:
>
>> Created https://issues.apache.org/jira/browse/BEAM-12999
>>
>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <ro...@google.com> wrote:
>>
>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>>
>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke...@gmail.com> wrote:
>>
>>
>> Let me add two new urns representing reshuffle via random key and
>> reshuffle using key. I will share the PR later here, would need some help
>> on Python/Go SDK changes too since I am not very familiar with them.
>>
>> Best,
>> Ke
>>
>>
>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>
>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>
>> Oh, yes.
>>
>> Java Reshuffle.of() = Python ReshufflePerKey()
>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>
>> We generally try to avoid this kind of discrepancy. It could make
>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>
>>
>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>> might be opinionated.
>>
>>
>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>> to me than vaiRandomKey(), but probably not worth changing so the
>> question becomes whether to be stilted or consistent.)
>>
>> More importantly - could we undeprecate Reshuffle
>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>> has undocumented and non-portable side-effects, but is still makes sense
>> for various use-cases (e.g. fan-out, or SDF).
>>
>>
>> +1
>>
>>
>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>>
>> I should have said that the descrepency lives in SDK not Class vs
>> Portable.
>>
>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the
>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> [3]
>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>>
>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>>
>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>> that. This is true for both Java and Python. This shouldn't depend on
>> classic vs. portable mode. It sounds like there's an issue in
>> translation.
>>
>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>>
>>
>> Hello All,
>>
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an
>> interesting fact that Reshuffle Transform in classic pipeline requires the
>> input to be KV while portable pipeline does not, where Reshuffle in
>> portable mode it has an extra step to append a random key [3].
>>
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to
>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of
>> questions on this:
>>
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement
>> translators for such transforms?
>> 3. If No, is this something we can improve?
>>
>> Best,
>> Ke
>>
>>
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>
>>
>>
>>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Burke <ro...@frantil.com>.
I can handle the Go SDK change once the urn is decided. I'm cleaning up a
change to add the combine_global urn in the Go SDK so this can slip in
along side it.

On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke...@gmail.com> wrote:

> Created https://issues.apache.org/jira/browse/BEAM-12999
>
> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>
> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke...@gmail.com> wrote:
>
>
> Let me add two new urns representing reshuffle via random key and
> reshuffle using key. I will share the PR later here, would need some help
> on Python/Go SDK changes too since I am not very familiar with them.
>
> Best,
> Ke
>
>
> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>
> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>
> Oh, yes.
>
> Java Reshuffle.of() = Python ReshufflePerKey()
> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>
> We generally try to avoid this kind of discrepancy. It could make
> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>
>
> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
> might be opinionated.
>
>
> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
> to me than vaiRandomKey(), but probably not worth changing so the
> question becomes whether to be stilted or consistent.)
>
> More importantly - could we undeprecate Reshuffle
> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
> has undocumented and non-portable side-effects, but is still makes sense
> for various use-cases (e.g. fan-out, or SDF).
>
>
> +1
>
>
> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>
> I should have said that the descrepency lives in SDK not Class vs Portable.
>
> Correct me if I am wrong, Reshuffle transform in Java SDK requires the
> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
> [2]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
> [3]
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>
> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> Reshuffle is not keyed, there is a separate reshuffle-per-key for
> that. This is true for both Java and Python. This shouldn't depend on
> classic vs. portable mode. It sounds like there's an issue in
> translation.
>
> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>
>
> Hello All,
>
> Recent Samza Runner tests failure in python/xlang [1][2] reveals an
> interesting fact that Reshuffle Transform in classic pipeline requires the
> input to be KV while portable pipeline does not, where Reshuffle in
> portable mode it has an extra step to append a random key [3].
>
> This suggests that Reshuffle in classic mode is, sort of, equivalent to
> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of
> questions on this:
>
> 1. Is such SDK/API discrepancy expected?
> 2. If Yes, then, what are the advised approach for runners to implement
> translators for such transforms?
> 3. If No, is this something we can improve?
>
> Best,
> Ke
>
>
> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
> [3]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>
>
>
>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Ke Wu <ke...@gmail.com>.
Created https://issues.apache.org/jira/browse/BEAM-12999 <https://issues.apache.org/jira/browse/BEAM-12999> 

> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
> 
> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke...@gmail.com> wrote:
>> 
>> Let me add two new urns representing reshuffle via random key and reshuffle using key. I will share the PR later here, would need some help on Python/Go SDK changes too since I am not very familiar with them.
>> 
>> Best,
>> Ke
>> 
>> 
>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com> wrote:
>> 
>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>> 
>> 
>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>> 
>> Oh, yes.
>> 
>> Java Reshuffle.of() = Python ReshufflePerKey()
>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>> 
>> We generally try to avoid this kind of discrepancy. It could make
>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>> 
>> 
>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>> might be opinionated.
>> 
>> 
>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>> to me than vaiRandomKey(), but probably not worth changing so the
>> question becomes whether to be stilted or consistent.)
>> 
>> More importantly - could we undeprecate Reshuffle
>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>> has undocumented and non-portable side-effects, but is still makes sense
>> for various use-cases (e.g. fan-out, or SDF).
>> 
>> 
>> +1
>> 
>> 
>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>> 
>> I should have said that the descrepency lives in SDK not Class vs Portable.
>> 
>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>> 
>> 
>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>> 
>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>> 
>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>> that. This is true for both Java and Python. This shouldn't depend on
>> classic vs. portable mode. It sounds like there's an issue in
>> translation.
>> 
>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>> 
>> 
>> Hello All,
>> 
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>> 
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>> 
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>> 3. If No, is this something we can improve?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> 
>> 


Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Bradshaw <ro...@google.com>.
Thanks. Happy to help with Python/Go. Do you want to create a JIRA?

On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke...@gmail.com> wrote:
>
> Let me add two new urns representing reshuffle via random key and reshuffle using key. I will share the PR later here, would need some help on Python/Go SDK changes too since I am not very familiar with them.
>
> Best,
> Ke
>
>
> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>
> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>
> Oh, yes.
>
> Java Reshuffle.of() = Python ReshufflePerKey()
> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>
> We generally try to avoid this kind of discrepancy. It could make
> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>
>
> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
> might be opinionated.
>
>
> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
> to me than vaiRandomKey(), but probably not worth changing so the
> question becomes whether to be stilted or consistent.)
>
> More importantly - could we undeprecate Reshuffle
> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
> has undocumented and non-portable side-effects, but is still makes sense
> for various use-cases (e.g. fan-out, or SDF).
>
>
> +1
>
>
> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>
> I should have said that the descrepency lives in SDK not Class vs Portable.
>
> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>
>
> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>
> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> Reshuffle is not keyed, there is a separate reshuffle-per-key for
> that. This is true for both Java and Python. This shouldn't depend on
> classic vs. portable mode. It sounds like there's an issue in
> translation.
>
> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>
>
> Hello All,
>
> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>
> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>
> 1. Is such SDK/API discrepancy expected?
> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
> 3. If No, is this something we can improve?
>
> Best,
> Ke
>
>
> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>
>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Ke Wu <ke...@gmail.com>.
Let me add two new urns representing reshuffle via random key and reshuffle using key. I will share the PR later here, would need some help on Python/Go SDK changes too since I am not very familiar with them.

Best,
Ke


> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je.ik@seznam.cz <ma...@seznam.cz>> wrote:
>> 
>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>> Oh, yes.
>>> 
>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>> 
>>> We generally try to avoid this kind of discrepancy. It could make
>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>> 
>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>> might be opinionated.
> 
> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
> to me than vaiRandomKey(), but probably not worth changing so the
> question becomes whether to be stilted or consistent.)
> 
>> More importantly - could we undeprecate Reshuffle
>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>> has undocumented and non-portable side-effects, but is still makes sense
>> for various use-cases (e.g. fan-out, or SDF).
> 
> +1
> 
>>> 
>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>>>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>>> 
>>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>>> 
>>>> 
>>>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>>>> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>>>> 
>>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>>>> 
>>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>>> that. This is true for both Java and Python. This shouldn't depend on
>>>> classic vs. portable mode. It sounds like there's an issue in
>>>> translation.
>>>> 
>>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>>>> 
>>>> 
>>>> Hello All,
>>>> 
>>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>>>> 
>>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>>>> 
>>>> 1. Is such SDK/API discrepancy expected?
>>>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>>>> 3. If No, is this something we can improve?
>>>> 
>>>> Best,
>>>> Ke
>>>> 
>>>> 
>>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>>>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730


Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Bradshaw <ro...@google.com>.
On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote:
>
> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
> > Oh, yes.
> >
> > Java Reshuffle.of() = Python ReshufflePerKey()
> > Java Reshuffle.viaRandomKey() == Python Reshuffle()
> >
> > We generally try to avoid this kind of discrepancy. It could make
> > sense to rename Reshuffle.of() to Reshuffle.viaKey().
>
> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
> might be opinionated.

usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
to me than vaiRandomKey(), but probably not worth changing so the
question becomes whether to be stilted or consistent.)

> More importantly - could we undeprecate Reshuffle
> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
> has undocumented and non-portable side-effects, but is still makes sense
> for various use-cases (e.g. fan-out, or SDF).

+1

> >
> > On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
> >> I should have said that the descrepency lives in SDK not Class vs Portable.
> >>
> >> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
> >>
> >>
> >> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
> >> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
> >> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
> >>
> >> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
> >>
> >> Reshuffle is not keyed, there is a separate reshuffle-per-key for
> >> that. This is true for both Java and Python. This shouldn't depend on
> >> classic vs. portable mode. It sounds like there's an issue in
> >> translation.
> >>
> >> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
> >>
> >>
> >> Hello All,
> >>
> >> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
> >>
> >> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
> >>
> >> 1. Is such SDK/API discrepancy expected?
> >> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
> >> 3. If No, is this something we can improve?
> >>
> >> Best,
> >> Ke
> >>
> >>
> >> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
> >> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
> >> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
> >>
> >>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Jan Lukavský <je...@seznam.cz>.
On 10/4/21 11:43 PM, Robert Bradshaw wrote:
> Oh, yes.
>
> Java Reshuffle.of() = Python ReshufflePerKey()
> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>
> We generally try to avoid this kind of discrepancy. It could make
> sense to rename Reshuffle.of() to Reshuffle.viaKey().

I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that 
might be opinionated. More importantly - could we undeprecate Reshuffle 
(in Java SDK)? I believe it was deprecated for wrong reasons - yes, it 
has undocumented and non-portable side-effects, but is still makes sense 
for various use-cases (e.g. fan-out, or SDF).

  Jan

>
> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>
>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>
>>
>> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>>
>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>>
>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>> that. This is true for both Java and Python. This shouldn't depend on
>> classic vs. portable mode. It sounds like there's an issue in
>> translation.
>>
>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>>
>>
>> Hello All,
>>
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>>
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>>
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>> 3. If No, is this something we can improve?
>>
>> Best,
>> Ke
>>
>>
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>
>>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Bradshaw <ro...@google.com>.
Oh, yes.

Java Reshuffle.of() = Python ReshufflePerKey()
Java Reshuffle.viaRandomKey() == Python Reshuffle()

We generally try to avoid this kind of discrepancy. It could make
sense to rename Reshuffle.of() to Reshuffle.viaKey().

On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke...@gmail.com> wrote:
>
> I should have said that the descrepency lives in SDK not Class vs Portable.
>
> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>
>
> [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
> [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>
> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
>
> Reshuffle is not keyed, there is a separate reshuffle-per-key for
> that. This is true for both Java and Python. This shouldn't depend on
> classic vs. portable mode. It sounds like there's an issue in
> translation.
>
> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>
>
> Hello All,
>
> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>
> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>
> 1. Is such SDK/API discrepancy expected?
> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
> 3. If No, is this something we can improve?
>
> Best,
> Ke
>
>
> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>
>

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Ke Wu <ke...@gmail.com>.
I should have said that the descrepency lives in SDK not Class vs Portable.

Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.


[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53> 
[2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730> 
[3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122> 

> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <ro...@google.com> wrote:
> 
> Reshuffle is not keyed, there is a separate reshuffle-per-key for
> that. This is true for both Java and Python. This shouldn't depend on
> classic vs. portable mode. It sounds like there's an issue in
> translation.
> 
> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>> 
>> Hello All,
>> 
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>> 
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>> 
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
>> 3. If No, is this something we can improve?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730


Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

Posted by Robert Bradshaw <ro...@google.com>.
Reshuffle is not keyed, there is a separate reshuffle-per-key for
that. This is true for both Java and Python. This shouldn't depend on
classic vs. portable mode. It sounds like there's an issue in
translation.

On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke...@gmail.com> wrote:
>
> Hello All,
>
> Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting fact that Reshuffle Transform in classic pipeline requires the input to be KV while portable pipeline does not, where Reshuffle in portable mode it has an extra step to append a random key [3].
>
> This suggests that Reshuffle in classic mode is, sort of, equivalent to ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of questions on this:
>
> 1. Is such SDK/API discrepancy expected?
> 2. If Yes, then, what are the advised approach for runners to implement translators for such transforms?
> 3. If No, is this something we can improve?
>
> Best,
> Ke
>
>
> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
> [3] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730