You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Gershi, Noam " <no...@citi.com> on 2019/10/24 09:46:35 UTC

Iterating filtered Grouped elements on SparkRunner

Hi,

I would like to:

1.       Group elements

2.       Then filter-out some groups

3.       Then iterate and calculate on filtered-in grouped

Under Spark execution environments, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated more than once,otherwise there could be data lost
                at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
                at java.lang.Iterable.spliterator(Iterable.java:101)
                at com.company.Main$2.processElement(Main.java:65)

Code attached


[citi_logo_mail][citi_logo_mail]Noam Gershi
Software Developer
T: +972 (3) 7405718
[Mail_signature_blue]


Re: Iterating filtered Grouped elements on SparkRunner

Posted by Jan Lukavský <je...@seznam.cz>.
As I said, that is just a hack. If you use global window, then you can 
achieve basically the same functionality if you first assign windows to 
your element with something like

   input.apply(Window.into(Sessions.withGapDuration( /* something really 
huge */ ))

Because session windowing is merging windowing, it will disable the 
non-merging grouping optimization which prevents you from reiterating 
the GBK result, and your code should start working.

Jan

On 10/24/19 3:08 PM, Gershi, Noam wrote:
>
> Thanx!
>
> I changed it to split (attached), and it works.
>
> Can you explain how to implement it with merging-windows (just for 
> knowledge) ?
>
> *From:*[seznam.cz] Jan Lukavský <je...@seznam.cz>
> *Sent:* Thursday, October 24, 2019 2:19 PM
> *To:* user@beam.apache.org
> *Subject:* Re: Iterating filtered Grouped elements on SparkRunner
>
> Hi Noam,
>
> we are working towards fixing this bug so that your code would work, 
> but that will not be sooner than version 2.18.0 (and I cannot promise 
> even that :-)). In the mean time, you have several options:
>
>   a) use merging windowing - that will unfortunately mean some 
> performance penalty and is more a dirty hack than anything else, but 
> it might work
>
>   b) split the logic into two parts - one part to calculate 
> cardinality of the group (you can use Count.perKey [1]) and then join 
> this on keys of the GBK result (e.g. [2] or [3]), filter there using 
> the calculated cardinality and calculate your result
>
> The option (b) should be actually more effective if you have large 
> groups, because calculation of the cardinality can be parallelised.
>
> Hope this helps, please feel free to ask any more questions.
>
> Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/transforms/Count.html 
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.16.0_org_apache_beam_sdk_transforms_Count.html&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=GMSufF4RsWXfgZzpKoxSx9y4xJT_yRXa8mLeT7pUi5Q&e=>
>
> [2] https://beam.apache.org/documentation/sdks/java/euphoria/#join 
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java_euphoria_-23join&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=asMS8QXczmLUNR6oJ6HLHOnAgVX4ecA0U9WSsRlTTIk&e=>
>
> [3] 
> https://beam.apache.org/documentation/sdks/java-extensions/#join-library 
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java-2Dextensions_-23join-2Dlibrary&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=RiOkpr6NOhfYIRAEvnRZX1K63VyYexdAHod9BgYiUYI&e=>
>
> On 10/24/19 11:46 AM, Gershi, Noam wrote:
>
>     Hi,
>
>     I would like to:
>
>     1.Group elements
>
>     2.Then filter-out some groups
>
>     3.Then iterate and calculate on filtered-in grouped
>
>     Under Spark execution environments, I get an exception:
>
>     Caused by: java.lang.IllegalStateException: ValueIterator can't be
>     iterated more than once,otherwise there could be data lost
>
>                     at
>     org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java
>     <https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=pVZKiNJxo2mW6dISx2QxGaUs-K0dBnjv3be2MqDvgyo&e=>:221)
>
>                     at java.lang.Iterable.spliterator(Iterable.java
>     <https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=_8I1NMTsVoB5XYEIGnFpn6j_UnmFxbU7dBIj4iWnfcI&e=>:101)
>
>                     at com.company.Main$2.processElement(Main.java
>     <https://urldefense.proofpoint.com/v2/url?u=http-3A__Main.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=Q6BCePmzjtGMRzcj9gkaqd_u0T9k5ihHXnxx3lmSs8c&e=>:65)
>
>     Code attached
>
>     citi_logo_mailciti_logo_mail*Noam Gershi*
>
>     Software Developer
>
>     *T*:+972 (3) 7405718
>
>     Mail_signature_blue
>

RE: Iterating filtered Grouped elements on SparkRunner

Posted by "Gershi, Noam " <no...@citi.com>.
Thanx!

I changed it to split (attached), and it works.

Can you explain how to implement it with merging-windows (just for knowledge) ?


From: [seznam.cz] Jan Lukavský <je...@seznam.cz>
Sent: Thursday, October 24, 2019 2:19 PM
To: user@beam.apache.org
Subject: Re: Iterating filtered Grouped elements on SparkRunner


Hi Noam,

we are working towards fixing this bug so that your code would work, but that will not be sooner than version 2.18.0 (and I cannot promise even that :-)). In the mean time, you have several options:

  a) use merging windowing - that will unfortunately mean some performance penalty and is more a dirty hack than anything else, but it might work

  b) split the logic into two parts - one part to calculate cardinality of the group (you can use Count.perKey [1]) and then join this on keys of the GBK result (e.g. [2] or [3]), filter there using the calculated cardinality and calculate your result

The option (b) should be actually more effective if you have large groups, because calculation of the cardinality can be parallelised.

Hope this helps, please feel free to ask any more questions.

Jan

[1] https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/transforms/Count.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_releases_javadoc_2.16.0_org_apache_beam_sdk_transforms_Count.html&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=GMSufF4RsWXfgZzpKoxSx9y4xJT_yRXa8mLeT7pUi5Q&e=>

[2] https://beam.apache.org/documentation/sdks/java/euphoria/#join<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java_euphoria_-23join&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=asMS8QXczmLUNR6oJ6HLHOnAgVX4ecA0U9WSsRlTTIk&e=>

[3] https://beam.apache.org/documentation/sdks/java-extensions/#join-library<https://urldefense.proofpoint.com/v2/url?u=https-3A__beam.apache.org_documentation_sdks_java-2Dextensions_-23join-2Dlibrary&d=DwMD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=RiOkpr6NOhfYIRAEvnRZX1K63VyYexdAHod9BgYiUYI&e=>


On 10/24/19 11:46 AM, Gershi, Noam wrote:
Hi,

I would like to:

1.       Group elements

2.       Then filter-out some groups

3.       Then iterate and calculate on filtered-in grouped

Under Spark execution environments, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated more than once,otherwise there could be data lost
                at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=pVZKiNJxo2mW6dISx2QxGaUs-K0dBnjv3be2MqDvgyo&e=>:221)
                at java.lang.Iterable.spliterator(Iterable.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=_8I1NMTsVoB5XYEIGnFpn6j_UnmFxbU7dBIj4iWnfcI&e=>:101)
                at com.company.Main$2.processElement(Main.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__Main.java&d=DwQD-g&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=7_fGoNrUoepqe81jwQN20j2TOEbJEfA-EHXiPa2B_wM&s=Q6BCePmzjtGMRzcj9gkaqd_u0T9k5ihHXnxx3lmSs8c&e=>:65)

Code attached


[citi_logo_mail][citi_logo_mail]Noam Gershi
Software Developer
T: +972 (3) 7405718
[Mail_signature_blue]


Re: Iterating filtered Grouped elements on SparkRunner

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Noam,

we are working towards fixing this bug so that your code would work, but 
that will not be sooner than version 2.18.0 (and I cannot promise even 
that :-)). In the mean time, you have several options:

   a) use merging windowing - that will unfortunately mean some 
performance penalty and is more a dirty hack than anything else, but it 
might work

   b) split the logic into two parts - one part to calculate cardinality 
of the group (you can use Count.perKey [1]) and then join this on keys 
of the GBK result (e.g. [2] or [3]), filter there using the calculated 
cardinality and calculate your result

The option (b) should be actually more effective if you have large 
groups, because calculation of the cardinality can be parallelised.

Hope this helps, please feel free to ask any more questions.

Jan

[1] 
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/transforms/Count.html

[2] https://beam.apache.org/documentation/sdks/java/euphoria/#join

[3] https://beam.apache.org/documentation/sdks/java-extensions/#join-library


On 10/24/19 11:46 AM, Gershi, Noam wrote:
>
> Hi,
>
> I would like to:
>
> 1.Group elements
>
> 2.Then filter-out some groups
>
> 3.Then iterate and calculate on filtered-in grouped
>
> Under Spark execution environments, I get an exception:
>
> Caused by: java.lang.IllegalStateException: ValueIterator can't be 
> iterated more than once,otherwise there could be data lost
>
>                 at 
> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>
>                 at java.lang.Iterable.spliterator(Iterable.java:101)
>
>                 at com.company.Main$2.processElement(Main.java:65)
>
> Code attached
>
> citi_logo_mailciti_logo_mail*Noam Gershi*
>
> Software Developer
>
> *T*:+972 (3) 7405718
>
> Mail_signature_blue
>