You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Gershi, Noam " <no...@citi.com> on 2019/10/02 09:43:44 UTC

RE: Multiple iterations after GroupByKey with SparkRunner

Hi,

Thanx.

Then I will make for now a separate DoFn for each calculation in parallel transformation.

Noam


From: [seznam.cz] Jan Lukavský <je...@seznam.cz>
Sent: Monday, September 30, 2019 11:27 AM
To: user@beam.apache.org
Subject: Re: Multiple iterations after GroupByKey with SparkRunner


Hi,

follow-up discussion on dev@ mailing list more or less suggests that the current behavior of Spark runner is a bug [1]. It will probably be fixed (a consensus about how to fix that has not yet been made), in the meantime you would probably have to use some workaround. Can you describe the actual logic of your pipeline? Maybe we can suggest a way to compute what you need without the necessity of reiterating the grouped elements.

Jan

[1] https://lists.apache.org/thread.html/5e71c98a32bcbc536f55f91f3e81e90ab7384c6ea56bdb30a92fcb31@%3Cdev.beam.apache.org%3E<https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread.html_5e71c98a32bcbc536f55f91f3e81e90ab7384c6ea56bdb30a92fcb31-40-253Cdev.beam.apache.org-253E&d=DwMDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=-BKtLodo_tuxYr-5x0CyWd8S0wrWG4echU5ek2AOejo&s=eXZPeBkz5uv3w2PPrjwjoYjYZNeedWpspQ2RG4uQ52M&e=>
On 9/29/19 1:54 PM, Gershi, Noam wrote:
Hi,

Thanx for the reply.

So – re-iteration on grouped elements is a runner-dependent. Flink & DataFlow allows it, while  Spark isn’t.

Since we investigating  here the runners also, Does anyone have a list which runner allow\not-allow re-iteration?

Noam


From: [apache.org<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache.org&d=DwQDaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=-BKtLodo_tuxYr-5x0CyWd8S0wrWG4echU5ek2AOejo&s=t2Shoqj_t6hkunRRnhhQFHc_IJWc9b2PYhJ4zjt9bFg&e=>] Kenneth Knowles <ke...@apache.org>
Sent: Friday, September 27, 2019 7:26 PM
To: dev
Cc: user
Subject: Re: Multiple iterations after GroupByKey with SparkRunner

I am pretty surprised that we do not have a @Category(ValidatesRunner) test in GroupByKeyTest that iterates multiple times. That is a major oversight. We should have this test, and it can be disabled by the SparkRunner's configuration.

Kenn

On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com>> wrote:
The Dataflow version does not spill to disk. However Spark's design might require spilling to disk if you want that to be implemented properly.

On Fri, Sep 27, 2019 at 9:08 AM David Morávek <dm...@apache.org>> wrote:
Hi,

Spark's GBK is currently implemented using `sortBy(key and value).mapPartition(...)` for non-merging windowing in order to support large keys and large scale shuffles. Merging windowing is implemented using standard GBK (underlying spark impl. uses ListCombiner + Hash Grouping), which is by design unable to support large keys.

As Jan noted, problem with mapPartition is, that its UDF receives an Iterator. Only option here is to wrap this iterator to one that spills to disk once an internal buffer is exceeded (the approach suggested by Reuven). This unfortunately comes with a cost in some cases. The best approach would be to somehow determine, that user wants multiple iterations and than wrap it in "re-iterator" if necessary. Does anyone have any ideas how to approach this?

D.

On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com>> wrote:
The Beam API was written to support multiple iterations, and there are definitely transforms that do so. I believe that CoGroupByKey may do this as well with the resulting iterator.

I know that the Dataflow runner is able to handles iterators larger than available memory by paging them in from shuffle, which still allows for reiterating. It sounds like Spark is less flexible here?

Reuven

On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz>> wrote:

+dev <de...@beam.apache.org>

Lukasz, why do you think that users expect to be able to iterate multiple times grouped elements? Besides that it obviously suggests the 'Iterable'? The way that spark behaves is pretty much analogous to how MapReduce used to work - in certain cases it calles repartitionAndSortWithinPartitions and then does mapPartition, which accepts Iterator - that is because internally it merge sorts pre sorted segments. This approach enables to GroupByKey data sets that are too big to fit into memory (per key).

If multiple iterations should be expected by users, we probably should:

 a) include that in @ValidatesRunner tests

 b) store values in memory on spark, which will break for certain pipelines

Because of (b) I think that it would be much better to remove this "expectation" and clearly document that the Iterable is not supposed to be iterated multiple times.

Jan
On 9/27/19 9:27 AM, Jan Lukavský wrote:

I pretty much think so, because that is how Spark works. The Iterable inside is really an Iterator, which cannot be iterated multiple times.

Jan
On 9/27/19 2:00 AM, Lukasz Cwik wrote:
Jan, in Beam users expect to be able to iterate the GBK output multiple times even from within the same ParDo.
Is this something that Beam on Spark Runner never supported?

On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>> wrote:

Hi Gershi,

could you please outline the pipeline you are trying to execute? Basically, you cannot iterate the Iterable multiple times in single ParDo. It should be possible, though, to apply multiple ParDos to output from GroupByKey.

Jan
On 9/26/19 3:32 PM, Gershi, Noam wrote:
Hi,

I want to iterate multiple times on the Iterable<V> (the output of GroupByKey transformation)
When my Runner is SparkRunner, I get an exception:

Caused by: java.lang.IllegalStateException: ValueIterator can't be iterated more than once,otherwise there could be data lost
                at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__GroupNonMergingWindowsFunctions.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=Ow9AGQZRtbMkhnvxbK4yzNOsKXIBsbO9l-MBvYh_PDs&e=>:221)
                at java.lang.Iterable.spliterator(Iterable.java<https://urldefense.proofpoint.com/v2/url?u=http-3A__Iterable.java&d=DwQFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=weSBkYYkcVBTPEmCbP7YQPAjEPSbBXyqPYqK6vLTOAM&m=fQWOlVJ0K7fMH1V-V2wbcwjQBpBYjRv-8EtoFNYcAZU&s=p1jPFPtbIpWO-mRa5ACZ-OQD97zpImY-FLSpBnvsqSM&e=>:101)


I understood I can branch the pipeline after GroupByKey into multiple transformation and iterate in each of them once on the Iterable<V>.

Is there a better way for that?


[citi_logo_mail][citi_logo_mail]Noam Gershi
Software Developer
T: +972 (3) 7405718<tel:+972%203-740-5718>
[Mail_signature_blue]