You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Holden Karau <ho...@pigscanfly.ca> on 2018/03/14 23:28:35 UTC

Splittable DoFN in Spark discussion

So we had a quick chat about what it would take to add something like
SplittableDoFns to Spark. I'd done some sketchy thinking about this last
year but didn't get very far.

My back-of-the-envelope design was as follows:
For input type T
Output type V

Implement a mapper which outputs type (T, V)
and if the computation finishes T will be populated otherwise V will be

For determining how long to run we'd up to either K seconds or listen for a
signal on a port

Once we're done running we take the result and filter for the ones with T
and V into seperate collections re-run until finished
and then union the results


This is maybe not a great design but it was minimally complicated and I
figured terrible was a good place to start and improve from.


Let me know your thoughts, especially the parts where this is worse than I
remember because its been awhile since I thought about this.


-- 
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
That would certainly be good.

On Sun, Mar 25, 2018 at 9:01 PM, Thomas Weise <th...@apache.org> wrote:

> Hopefully the new "continuous processing mode" in Spark will enable SDF
> implementation (and real streaming)?
>
> Thanks,
> Thomas
>
>
> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>>
>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>
>>>>>>>> So on the streaming side we could simply do it with a fixed number
>>>>>>>> of levels on DStreams. It’s not great but it would work.
>>>>>>>>
>>>>>>> Not sure I understand this. Let me try to clarify what SDF demands
>>>>>>> of the runner. Imagine the following case: a file contains a list of
>>>>>>> "master" Kafka topics, on which there are published additional Kafka topics
>>>>>>> to read.
>>>>>>>
>>>>>>> PCollection<String> masterTopics = TextIO.read().from(masterTopic
>>>>>>> sFile)
>>>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo(ReadF
>>>>>>> romKafkaFn))
>>>>>>> PCollection<String> records = nestedTopics.apply(ParDo(ReadF
>>>>>>> romKafkaFn))
>>>>>>>
>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>> infinite output for every input:
>>>>>>> - Applying it to a finite set of inputs (in this case to the result
>>>>>>> of reading a text file)
>>>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>>>>> number of streams being read concurrently, each of the streams themselves
>>>>>>> is unbounded too)
>>>>>>>
>>>>>>> Does the multi-level solution you have in mind work for this case? I
>>>>>>> suppose the second case is harder, so we can focus on that.
>>>>>>>
>>>>>> So none of those are a splittabledofn right?
>>>>>>
>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>
>>>>>
>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>> DoFns in my head.
>>>>
>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>> source because of some queue stream limitations), which invites some
>>>> reliability challenges. This might be at the point where I should send a
>>>> diagram/some sample code since it’s a bit convoluted.
>>>>
>>>> The more I think about the jumps required to make the “simple” union
>>>> approach work, the more it seems just using the statemapping for steaming
>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>> somewhat expensive so it would probably make sense to benchmark to see if
>>>> it meets our needs.
>>>>
>>> So the problem is, I don't think this can be made to work using
>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>> output for an input element, directly or not.
>>>
>> So, provided there is an infinite input (eg pick a never ending queue
>> stream), and each call produces a finite output, we would have an infinite
>> number of calls.
>>
>>>
>>> Dataflow and Flink, for example, had timer support even before SDFs, and
>>> a timer can set another timer and thus end up doing an infinite amount of
>>> work in a fault tolerant way - so SDF could be implemented on top of that.
>>> But AFAIK spark doesn't have a similar feature, hence my concern.
>>>
>> So we can do an inifinite queue stream which would allow us to be
>> triggered at each interval and handle our own persistence.
>>
>>>
>>>
>>>> But these still are both DStream based rather than Dataset which we
>>>> might want to support (depends on what direction folks take with the
>>>> runners).
>>>>
>>>> If we wanted to do this in the dataset world looking at a custom
>>>> sink/source would also be an option, (which is effectively what a custom
>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>
>>>>
>>>>>> Assuming that we have a given dstream though in Spark we can get the
>>>>>> underlying RDD implementation for each microbatch and do our work inside of
>>>>>> that.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> More generally this does raise an important question if we want to
>>>>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>>>>> some more poking.
>>>>>>>>
>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>>
>>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up as
>>>>>>>> a straggler.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>>>> state)
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>> track of the computation so far instead of outputting V each time? (also
>>>>>>>>>>>>> the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>> and if the computation finishes T will be populated otherwise
>>>>>>>>>>>>>> V will be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds
>>>>>>>>>>>>>> or listen for a signal on a port
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>>>>> worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


-- 
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
I think this stuff is happening in SparkGroupAlsoByWindowViaWindowSet:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L610

As far as I can tell, there is no infinite stream of pings involved.
However, Spark documentation says under
https://spark.apache.org/docs/latest/streaming-programming-guide.html#updatestatebykey-operation
:
"In every batch, Spark will apply the state update function for all
existing keys, regardless of whether they have new data in a batch or not"

It even provides a way to GC the state - " If the update function returns
None then the key-value pair will be eliminated."

This looks promising. Does Spark streaming always periodically create some
batches, and they just turn out empty if there's no data? If so, then we
probably won't even need an infinite stream of pings.

On Fri, Apr 27, 2018 at 12:14 PM Kenneth Knowles <kl...@google.com> wrote:

> On Fri, Apr 27, 2018 at 12:06 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Fri, Apr 27, 2018 at 11:56 AM Kenneth Knowles <kl...@google.com> wrote:
>>
>> > I'm still pretty shallow on this topic & this thread, so forgive if I'm
>> restating or missing things.
>>
>> > My understanding is that the Spark runner does support Beam's triggering
>> semantics for unbounded aggregations, using the same support code from
>> runners/core that all runners use. Relevant code in SparkTimerInternals
>> and
>> SparkGroupAlsoByWindowViaWindowSet.
>>
>> > IIRC timers are stored in state, scanned each microbatch to see which
>> are
>> eligible.
>>
>> I think the issue (which is more severe in the case of sources) is what to
>> do if no more date comes in to trigger another microbatch.
>>
>
> So will a streaming pipeline fail to trigger in this case? I have this
> feeling the "join with an infinite stream of pings" might already be
> happening.
>
> Kenn
>
>
>
>> > I don't see an immediate barrier to having timer loops. I don't know
>> about performance of this approach, but currently the number of timers per
>> shard (key+window) is bounded by their declarations in code, so it is a
>> tiny number unless codegenerated. We do later want to have dynamic timers
>> (some people call it a TimerMap by analogy with MapState) but I haven't
>> seen a design or even a sketch that I can recall.
>>
>> > Kenn
>>
>> > On Thu, Apr 26, 2018 at 1:48 PM Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>> >> Yeah that's been the implied source of being able to be continuous, you
>> union with a receiver which produce an infinite number of batches (the
>> "never ending queue stream" but not actually a queuestream since they have
>> some limitations but our own implementation there of).
>>
>> >> On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote:
>>
>> >>> Could we do this behind the scenes by writing a Receiver that
>> publishes
>> periodic pings?
>>
>> >>> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <
>> kirpichov@google.com>
>> wrote:
>>
>> >>>> Kenn - I'm arguing that in Spark SDF style computation can not be
>> expressed at all, and neither can Beam's timers.
>>
>> >>>> Spark, unlike Flink, does not have a timer facility (only state), and
>> as far as I can tell its programming model has no other primitive that can
>> map a finite RDD into an infinite DStream - the only way to create a new
>> infinite DStream appears to be to write a Receiver.
>>
>> >>>> I cc'd you because I'm wondering whether you've already investigated
>> this when considering whether timers can be implemented on the Spark
>> runner.
>>
>> >>>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <kl...@google.com>
>> wrote:
>>
>> >>>>> I don't think I understand what the limitations of timers are that
>> you are referring to. FWIW I would say implementing other primitives like
>> SDF is an explicit non-goal for Beam state & timers.
>>
>> >>>>> I got lost at some point in this thread, but is it actually
>> necessary
>> that a bounded PCollection maps to a finite/bounded structure in Spark?
>> Skimming, I'm not sure if the problem is that we can't transliterate Beam
>> to Spark (this might be a good sign) or that we can't express SDF style
>> computation at all (seems far-fetched, but I could be convinced). Does
>> doing a lightweight analysis and just promoting some things to be some
>> kind
>> of infinite representation help?
>>
>> >>>>> Kenn
>>
>> >>>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov
>> >>>>> <ki...@google.com>
>> wrote:
>>
>> >>>>>> Would like to revive this thread one more time.
>>
>> >>>>>> At this point I'm pretty certain that Spark can't support this out
>> of the box and we're gonna have to make changes to Spark.
>>
>> >>>>>> Holden, could you advise who would be some Spark experts (yourself
>> included :) ) who could advise what kind of Spark change would both
>> support
>> this AND be useful to the regular Spark community (non-Beam) so that it
>> has
>> a chance of finding support? E.g. is there any plan in Spark regarding
>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>> with that?
>>
>> >>>>>> +Kenneth Knowles because timers suffer from the same problem.
>>
>> >>>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>>
>> >>>>>>> (resurrecting thread as I'm back from leave)
>>
>> >>>>>>> I looked at this mode, and indeed as Reuven points out it seems
>> that it affects execution details, but doesn't offer any new APIs.
>> >>>>>>> Holden - your suggestions of piggybacking an unbounded-per-element
>> SDF on top of an infinite stream would work if 1) there was just 1 element
>> and 2) the work was guaranteed to be infinite.
>>
>> >>>>>>> Unfortunately, both of these assumptions are insufficient. In
>> particular:
>>
>> >>>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself
>> may be unbounded; and the unbounded work done by the SDF happens for every
>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>> arrive, and we may end up concurrently reading a continuously growing
>> number of topics.
>> >>>>>>> - 2: The work per element is not necessarily infinite, it's just
>> not guaranteed to be finite - the SDF is allowed at any moment to say
>> "Okay, this restriction is done for real" by returning stop() from the
>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>> that if the topic/partition being watched is deleted. Having an infinite
>> stream as a driver of this process would require being able to send a
>> signal to the stream to stop itself.
>>
>> >>>>>>> Is it looking like there's any other way this can be done in Spark
>> as-is, or are we going to have to make changes to Spark to support this?
>>
>> >>>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <
>> holden@pigscanfly.ca>
>> wrote:
>>
>> >>>>>>>> I mean the new mode is very much in the Dataset not the DStream
>> API (although you can use the Dataset API with the old modes too).
>>
>> >>>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com>
>> wrote:
>>
>> >>>>>>>>> But this new mode isn't a semantic change, right? It's moving
>> away from micro batches into something that looks a lot like what Flink
>> does - continuous processing with asynchronous snapshot boundaries.
>>
>> >>>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org>
>> wrote:
>>
>> >>>>>>>>>> Hopefully the new "continuous processing mode" in Spark will
>> enable SDF implementation (and real streaming)?
>>
>> >>>>>>>>>> Thanks,
>> >>>>>>>>>> Thomas
>>
>>
>> >>>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <
>> holden@pigscanfly.ca> wrote:
>>
>>
>> >>>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>>
>>
>>
>> >>>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <
>> holden@pigscanfly.ca> wrote:
>>
>> >>>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>>
>> >>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>> holden@pigscanfly.ca> wrote:
>>
>> >>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>>
>> >>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>> holden@pigscanfly.ca> wrote:
>>
>> >>>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>>
>> >>>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk
>> for Spark runner streaming. Holden, is it correct that Spark appears to
>> have no way at all to produce an infinite DStream from a finite RDD? Maybe
>> we can somehow dynamically create a new DStream for every initial
>> restriction, said DStream being obtained using a Receiver that under the
>> hood actually runs the SDF? (this is of course less efficient than a
>> timer-capable runner would do, and I have doubts about the fault
>> tolerance)
>>
>> >>>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a
>> fixed number of levels on DStreams. It’s not great but it would work.
>>
>> >>>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what
>> SDF
>> demands of the runner. Imagine the following case: a file contains a list
>> of "master" Kafka topics, on which there are published additional Kafka
>> topics to read.
>>
>> >>>>>>>>>>>>>>>> PCollection<String> masterTopics =
>> TextIO.read().from(masterTopicsFile)
>> >>>>>>>>>>>>>>>> PCollection<String> nestedTopics =
>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>> >>>>>>>>>>>>>>>> PCollection<String> records =
>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>
>> >>>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that
>> emits infinite output for every input:
>> >>>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to
>> the result of reading a text file)
>> >>>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having
>> an
>> unbounded number of streams being read concurrently, each of the streams
>> themselves is unbounded too)
>>
>> >>>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for
>> this case? I suppose the second case is harder, so we can focus on that.
>>
>> >>>>>>>>>>>>>>> So none of those are a splittabledofn right?
>>
>> >>>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples
>> is
>> a splittable DoFn and we're trying to figure out how to make Spark run it.
>>
>>
>> >>>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as
>> old style DoFns in my head.
>>
>> >>>>>>>>>>>>> To effectively allow us to union back into the “same”
>> DStream
>>   we’d have to end up using Sparks queue streams (or their equivalent
>> custom
>> source because of some queue stream limitations), which invites some
>> reliability challenges. This might be at the point where I should send a
>> diagram/some sample code since it’s a bit convoluted.
>>
>> >>>>>>>>>>>>> The more I think about the jumps required to make the
>> “simple” union approach work, the more it seems just using the
>> statemapping
>> for steaming is probably more reasonable. Although the state tracking in
>> Spark can be somewhat expensive so it would probably make sense to
>> benchmark to see if it meets our needs.
>>
>> >>>>>>>>>>>> So the problem is, I don't think this can be made to work
>> using mapWithState. It doesn't allow a mapping function that emits
>> infinite
>> output for an input element, directly or not.
>>
>> >>>>>>>>>>> So, provided there is an infinite input (eg pick a never
>> ending
>> queue stream), and each call produces a finite output, we would have an
>> infinite number of calls.
>>
>>
>> >>>>>>>>>>>> Dataflow and Flink, for example, had timer support even
>> before
>> SDFs, and a timer can set another timer and thus end up doing an infinite
>> amount of work in a fault tolerant way - so SDF could be implemented on
>> top
>> of that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>
>> >>>>>>>>>>> So we can do an inifinite queue stream which would allow us to
>> be triggered at each interval and handle our own persistence.
>>
>>
>>
>> >>>>>>>>>>>>> But these still are both DStream based rather than Dataset
>> which we might want to support (depends on what direction folks take with
>> the runners).
>>
>> >>>>>>>>>>>>> If we wanted to do this in the dataset world looking at a
>> custom sink/source would also be an option, (which is effectively what a
>> custom queue stream like thing for dstreams requires), but the datasource
>> APIs are a bit influx so if we ended up doing things at the edge of what’s
>> allowed there’s a good chance we’d have to rewrite it a few times.
>>
>>
>> >>>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we
>> can get the underlying RDD implementation for each microbatch and do our
>> work inside of that.
>>
>>
>>
>>
>> >>>>>>>>>>>>>>>>> More generally this does raise an important question if
>> we want to target datasets instead of rdds/DStreams in which case i would
>> need to do some more poking.
>>
>>
>> >>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
>> relax@google.com> wrote:
>>
>> >>>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>> reprocessing, the same way you proposed for SDF?
>>
>> >>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>> system. Could use a singleton so if a partition is re-executed it doesn’t
>> end up as a straggler.
>>
>>
>>
>> >>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>> holden@pigscanfly.ca> wrote:
>>
>> >>>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>
>> >>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>> kirpichov@google.com> wrote:
>>
>> >>>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
>> support for state)
>>
>> >>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>> relax@google.com> wrote:
>>
>> >>>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function
>> to keep track of the computation so far instead of outputting V each time?
>> (also the progress so far is probably of a different type R rather than
>> V).
>>
>>
>> >>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>> holden@pigscanfly.ca> wrote:
>>
>> >>>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to
>> add something like SplittableDoFns to Spark. I'd done some sketchy
>> thinking
>> about this last year but didn't get very far.
>>
>> >>>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>> >>>>>>>>>>>>>>>>>>>>>>> For input type T
>> >>>>>>>>>>>>>>>>>>>>>>> Output type V
>>
>> >>>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>> >>>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be
>> populated
>> otherwise V will be
>>
>> >>>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either
>> K
>> seconds or listen for a signal on a port
>>
>> >>>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and
>> filter for the ones with T and V into seperate collections re-run until
>> finished
>> >>>>>>>>>>>>>>>>>>>>>>> and then union the results
>>
>>
>> >>>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was
>> minimally complicated and I figured terrible was a good place to start and
>> improve from.
>>
>>
>> >>>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts
>> where this is worse than I remember because its been awhile since I
>> thought
>> about this.
>>
>>
>> >>>>>>>>>>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>
>> >>>>>>>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>
>> >>>>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>
>> >>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>
>> >>>>>>>>>>>>> --
>> >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>
>> >>>>>>>>>>> --
>> >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>
>>
>>
>>
>>
>> >>>>>>>> --
>> >>>>>>>> Twitter: https://twitter.com/holdenkarau
>>
>>
>>
>>
>> >> --
>> >> Twitter: https://twitter.com/holdenkarau
>>
>

Re: Splittable DoFN in Spark discussion

Posted by Kenneth Knowles <kl...@google.com>.
On Fri, Apr 27, 2018 at 12:06 PM Robert Bradshaw <ro...@google.com>
wrote:

> On Fri, Apr 27, 2018 at 11:56 AM Kenneth Knowles <kl...@google.com> wrote:
>
> > I'm still pretty shallow on this topic & this thread, so forgive if I'm
> restating or missing things.
>
> > My understanding is that the Spark runner does support Beam's triggering
> semantics for unbounded aggregations, using the same support code from
> runners/core that all runners use. Relevant code in SparkTimerInternals and
> SparkGroupAlsoByWindowViaWindowSet.
>
> > IIRC timers are stored in state, scanned each microbatch to see which are
> eligible.
>
> I think the issue (which is more severe in the case of sources) is what to
> do if no more date comes in to trigger another microbatch.
>

So will a streaming pipeline fail to trigger in this case? I have this
feeling the "join with an infinite stream of pings" might already be
happening.

Kenn



> > I don't see an immediate barrier to having timer loops. I don't know
> about performance of this approach, but currently the number of timers per
> shard (key+window) is bounded by their declarations in code, so it is a
> tiny number unless codegenerated. We do later want to have dynamic timers
> (some people call it a TimerMap by analogy with MapState) but I haven't
> seen a design or even a sketch that I can recall.
>
> > Kenn
>
> > On Thu, Apr 26, 2018 at 1:48 PM Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
> >> Yeah that's been the implied source of being able to be continuous, you
> union with a receiver which produce an infinite number of batches (the
> "never ending queue stream" but not actually a queuestream since they have
> some limitations but our own implementation there of).
>
> >> On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote:
>
> >>> Could we do this behind the scenes by writing a Receiver that publishes
> periodic pings?
>
> >>> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <
> kirpichov@google.com>
> wrote:
>
> >>>> Kenn - I'm arguing that in Spark SDF style computation can not be
> expressed at all, and neither can Beam's timers.
>
> >>>> Spark, unlike Flink, does not have a timer facility (only state), and
> as far as I can tell its programming model has no other primitive that can
> map a finite RDD into an infinite DStream - the only way to create a new
> infinite DStream appears to be to write a Receiver.
>
> >>>> I cc'd you because I'm wondering whether you've already investigated
> this when considering whether timers can be implemented on the Spark
> runner.
>
> >>>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <kl...@google.com>
> wrote:
>
> >>>>> I don't think I understand what the limitations of timers are that
> you are referring to. FWIW I would say implementing other primitives like
> SDF is an explicit non-goal for Beam state & timers.
>
> >>>>> I got lost at some point in this thread, but is it actually necessary
> that a bounded PCollection maps to a finite/bounded structure in Spark?
> Skimming, I'm not sure if the problem is that we can't transliterate Beam
> to Spark (this might be a good sign) or that we can't express SDF style
> computation at all (seems far-fetched, but I could be convinced). Does
> doing a lightweight analysis and just promoting some things to be some kind
> of infinite representation help?
>
> >>>>> Kenn
>
> >>>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov
> >>>>> <ki...@google.com>
> wrote:
>
> >>>>>> Would like to revive this thread one more time.
>
> >>>>>> At this point I'm pretty certain that Spark can't support this out
> of the box and we're gonna have to make changes to Spark.
>
> >>>>>> Holden, could you advise who would be some Spark experts (yourself
> included :) ) who could advise what kind of Spark change would both support
> this AND be useful to the regular Spark community (non-Beam) so that it has
> a chance of finding support? E.g. is there any plan in Spark regarding
> adding timers similar to Flink's or Beam's timers, maybe we could help out
> with that?
>
> >>>>>> +Kenneth Knowles because timers suffer from the same problem.
>
> >>>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
>
> >>>>>>> (resurrecting thread as I'm back from leave)
>
> >>>>>>> I looked at this mode, and indeed as Reuven points out it seems
> that it affects execution details, but doesn't offer any new APIs.
> >>>>>>> Holden - your suggestions of piggybacking an unbounded-per-element
> SDF on top of an infinite stream would work if 1) there was just 1 element
> and 2) the work was guaranteed to be infinite.
>
> >>>>>>> Unfortunately, both of these assumptions are insufficient. In
> particular:
>
> >>>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself
> may be unbounded; and the unbounded work done by the SDF happens for every
> element. E.g. we might have a Kafka topic on which names of Kafka topics
> arrive, and we may end up concurrently reading a continuously growing
> number of topics.
> >>>>>>> - 2: The work per element is not necessarily infinite, it's just
> not guaranteed to be finite - the SDF is allowed at any moment to say
> "Okay, this restriction is done for real" by returning stop() from the
> @ProcessElement method. Continuing the Kafka example, e.g., it could do
> that if the topic/partition being watched is deleted. Having an infinite
> stream as a driver of this process would require being able to send a
> signal to the stream to stop itself.
>
> >>>>>>> Is it looking like there's any other way this can be done in Spark
> as-is, or are we going to have to make changes to Spark to support this?
>
> >>>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <holden@pigscanfly.ca
> >
> wrote:
>
> >>>>>>>> I mean the new mode is very much in the Dataset not the DStream
> API (although you can use the Dataset API with the old modes too).
>
> >>>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com>
> wrote:
>
> >>>>>>>>> But this new mode isn't a semantic change, right? It's moving
> away from micro batches into something that looks a lot like what Flink
> does - continuous processing with asynchronous snapshot boundaries.
>
> >>>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org>
> wrote:
>
> >>>>>>>>>> Hopefully the new "continuous processing mode" in Spark will
> enable SDF implementation (and real streaming)?
>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Thomas
>
>
> >>>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <
> holden@pigscanfly.ca> wrote:
>
>
> >>>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
>
>
>
> >>>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <
> holden@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
>
> >>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
> holden@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
>
> >>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
> holden@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk
> for Spark runner streaming. Holden, is it correct that Spark appears to
> have no way at all to produce an infinite DStream from a finite RDD? Maybe
> we can somehow dynamically create a new DStream for every initial
> restriction, said DStream being obtained using a Receiver that under the
> hood actually runs the SDF? (this is of course less efficient than a
> timer-capable runner would do, and I have doubts about the fault tolerance)
>
> >>>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a
> fixed number of levels on DStreams. It’s not great but it would work.
>
> >>>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
> demands of the runner. Imagine the following case: a file contains a list
> of "master" Kafka topics, on which there are published additional Kafka
> topics to read.
>
> >>>>>>>>>>>>>>>> PCollection<String> masterTopics =
> TextIO.read().from(masterTopicsFile)
> >>>>>>>>>>>>>>>> PCollection<String> nestedTopics =
> masterTopics.apply(ParDo(ReadFromKafkaFn))
> >>>>>>>>>>>>>>>> PCollection<String> records =
> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>
> >>>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that
> emits infinite output for every input:
> >>>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to
> the result of reading a text file)
> >>>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
> unbounded number of streams being read concurrently, each of the streams
> themselves is unbounded too)
>
> >>>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for
> this case? I suppose the second case is harder, so we can focus on that.
>
> >>>>>>>>>>>>>>> So none of those are a splittabledofn right?
>
> >>>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is
> a splittable DoFn and we're trying to figure out how to make Spark run it.
>
>
> >>>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as
> old style DoFns in my head.
>
> >>>>>>>>>>>>> To effectively allow us to union back into the “same” DStream
>   we’d have to end up using Sparks queue streams (or their equivalent
> custom
> source because of some queue stream limitations), which invites some
> reliability challenges. This might be at the point where I should send a
> diagram/some sample code since it’s a bit convoluted.
>
> >>>>>>>>>>>>> The more I think about the jumps required to make the
> “simple” union approach work, the more it seems just using the statemapping
> for steaming is probably more reasonable. Although the state tracking in
> Spark can be somewhat expensive so it would probably make sense to
> benchmark to see if it meets our needs.
>
> >>>>>>>>>>>> So the problem is, I don't think this can be made to work
> using mapWithState. It doesn't allow a mapping function that emits infinite
> output for an input element, directly or not.
>
> >>>>>>>>>>> So, provided there is an infinite input (eg pick a never ending
> queue stream), and each call produces a finite output, we would have an
> infinite number of calls.
>
>
> >>>>>>>>>>>> Dataflow and Flink, for example, had timer support even before
> SDFs, and a timer can set another timer and thus end up doing an infinite
> amount of work in a fault tolerant way - so SDF could be implemented on top
> of that. But AFAIK spark doesn't have a similar feature, hence my concern.
>
> >>>>>>>>>>> So we can do an inifinite queue stream which would allow us to
> be triggered at each interval and handle our own persistence.
>
>
>
> >>>>>>>>>>>>> But these still are both DStream based rather than Dataset
> which we might want to support (depends on what direction folks take with
> the runners).
>
> >>>>>>>>>>>>> If we wanted to do this in the dataset world looking at a
> custom sink/source would also be an option, (which is effectively what a
> custom queue stream like thing for dstreams requires), but the datasource
> APIs are a bit influx so if we ended up doing things at the edge of what’s
> allowed there’s a good chance we’d have to rewrite it a few times.
>
>
> >>>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we
> can get the underlying RDD implementation for each microbatch and do our
> work inside of that.
>
>
>
>
> >>>>>>>>>>>>>>>>> More generally this does raise an important question if
> we want to target datasets instead of rdds/DStreams in which case i would
> need to do some more poking.
>
>
> >>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
> relax@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
> reprocessing, the same way you proposed for SDF?
>
> >>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
> system. Could use a singleton so if a partition is re-executed it doesn’t
> end up as a straggler.
>
>
>
> >>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
> holden@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>
> >>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
> kirpichov@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
> support for state)
>
> >>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
> relax@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function
> to keep track of the computation so far instead of outputting V each time?
> (also the progress so far is probably of a different type R rather than V).
>
>
> >>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
> holden@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to
> add something like SplittableDoFns to Spark. I'd done some sketchy thinking
> about this last year but didn't get very far.
>
> >>>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
> >>>>>>>>>>>>>>>>>>>>>>> For input type T
> >>>>>>>>>>>>>>>>>>>>>>> Output type V
>
> >>>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
> >>>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
> otherwise V will be
>
> >>>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
> seconds or listen for a signal on a port
>
> >>>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and
> filter for the ones with T and V into seperate collections re-run until
> finished
> >>>>>>>>>>>>>>>>>>>>>>> and then union the results
>
>
> >>>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was
> minimally complicated and I figured terrible was a good place to start and
> improve from.
>
>
> >>>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts
> where this is worse than I remember because its been awhile since I thought
> about this.
>
>
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>> --
> >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
>
>
>
>
> >>>>>>>> --
> >>>>>>>> Twitter: https://twitter.com/holdenkarau
>
>
>
>
> >> --
> >> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Robert Bradshaw <ro...@google.com>.
On Fri, Apr 27, 2018 at 11:56 AM Kenneth Knowles <kl...@google.com> wrote:

> I'm still pretty shallow on this topic & this thread, so forgive if I'm
restating or missing things.

> My understanding is that the Spark runner does support Beam's triggering
semantics for unbounded aggregations, using the same support code from
runners/core that all runners use. Relevant code in SparkTimerInternals and
SparkGroupAlsoByWindowViaWindowSet.

> IIRC timers are stored in state, scanned each microbatch to see which are
eligible.

I think the issue (which is more severe in the case of sources) is what to
do if no more date comes in to trigger another microbatch.

> I don't see an immediate barrier to having timer loops. I don't know
about performance of this approach, but currently the number of timers per
shard (key+window) is bounded by their declarations in code, so it is a
tiny number unless codegenerated. We do later want to have dynamic timers
(some people call it a TimerMap by analogy with MapState) but I haven't
seen a design or even a sketch that I can recall.

> Kenn

> On Thu, Apr 26, 2018 at 1:48 PM Holden Karau <ho...@pigscanfly.ca> wrote:

>> Yeah that's been the implied source of being able to be continuous, you
union with a receiver which produce an infinite number of batches (the
"never ending queue stream" but not actually a queuestream since they have
some limitations but our own implementation there of).

>> On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote:

>>> Could we do this behind the scenes by writing a Receiver that publishes
periodic pings?

>>> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <ki...@google.com>
wrote:

>>>> Kenn - I'm arguing that in Spark SDF style computation can not be
expressed at all, and neither can Beam's timers.

>>>> Spark, unlike Flink, does not have a timer facility (only state), and
as far as I can tell its programming model has no other primitive that can
map a finite RDD into an infinite DStream - the only way to create a new
infinite DStream appears to be to write a Receiver.

>>>> I cc'd you because I'm wondering whether you've already investigated
this when considering whether timers can be implemented on the Spark runner.

>>>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <kl...@google.com> wrote:

>>>>> I don't think I understand what the limitations of timers are that
you are referring to. FWIW I would say implementing other primitives like
SDF is an explicit non-goal for Beam state & timers.

>>>>> I got lost at some point in this thread, but is it actually necessary
that a bounded PCollection maps to a finite/bounded structure in Spark?
Skimming, I'm not sure if the problem is that we can't transliterate Beam
to Spark (this might be a good sign) or that we can't express SDF style
computation at all (seems far-fetched, but I could be convinced). Does
doing a lightweight analysis and just promoting some things to be some kind
of infinite representation help?

>>>>> Kenn

>>>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov
>>>>> <ki...@google.com>
wrote:

>>>>>> Would like to revive this thread one more time.

>>>>>> At this point I'm pretty certain that Spark can't support this out
of the box and we're gonna have to make changes to Spark.

>>>>>> Holden, could you advise who would be some Spark experts (yourself
included :) ) who could advise what kind of Spark change would both support
this AND be useful to the regular Spark community (non-Beam) so that it has
a chance of finding support? E.g. is there any plan in Spark regarding
adding timers similar to Flink's or Beam's timers, maybe we could help out
with that?

>>>>>> +Kenneth Knowles because timers suffer from the same problem.

>>>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <
kirpichov@google.com> wrote:

>>>>>>> (resurrecting thread as I'm back from leave)

>>>>>>> I looked at this mode, and indeed as Reuven points out it seems
that it affects execution details, but doesn't offer any new APIs.
>>>>>>> Holden - your suggestions of piggybacking an unbounded-per-element
SDF on top of an infinite stream would work if 1) there was just 1 element
and 2) the work was guaranteed to be infinite.

>>>>>>> Unfortunately, both of these assumptions are insufficient. In
particular:

>>>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself
may be unbounded; and the unbounded work done by the SDF happens for every
element. E.g. we might have a Kafka topic on which names of Kafka topics
arrive, and we may end up concurrently reading a continuously growing
number of topics.
>>>>>>> - 2: The work per element is not necessarily infinite, it's just
not guaranteed to be finite - the SDF is allowed at any moment to say
"Okay, this restriction is done for real" by returning stop() from the
@ProcessElement method. Continuing the Kafka example, e.g., it could do
that if the topic/partition being watched is deleted. Having an infinite
stream as a driver of this process would require being able to send a
signal to the stream to stop itself.

>>>>>>> Is it looking like there's any other way this can be done in Spark
as-is, or are we going to have to make changes to Spark to support this?

>>>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca>
wrote:

>>>>>>>> I mean the new mode is very much in the Dataset not the DStream
API (although you can use the Dataset API with the old modes too).

>>>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com>
wrote:

>>>>>>>>> But this new mode isn't a semantic change, right? It's moving
away from micro batches into something that looks a lot like what Flink
does - continuous processing with asynchronous snapshot boundaries.

>>>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org>
wrote:

>>>>>>>>>> Hopefully the new "continuous processing mode" in Spark will
enable SDF implementation (and real streaming)?

>>>>>>>>>> Thanks,
>>>>>>>>>> Thomas


>>>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <
holden@pigscanfly.ca> wrote:


>>>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
kirpichov@google.com> wrote:



>>>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <
holden@pigscanfly.ca> wrote:

>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
kirpichov@google.com> wrote:

>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
holden@pigscanfly.ca> wrote:

>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
kirpichov@google.com> wrote:

>>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
holden@pigscanfly.ca> wrote:

>>>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
kirpichov@google.com> wrote:

>>>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk
for Spark runner streaming. Holden, is it correct that Spark appears to
have no way at all to produce an infinite DStream from a finite RDD? Maybe
we can somehow dynamically create a new DStream for every initial
restriction, said DStream being obtained using a Receiver that under the
hood actually runs the SDF? (this is of course less efficient than a
timer-capable runner would do, and I have doubts about the fault tolerance)

>>>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a
fixed number of levels on DStreams. It’s not great but it would work.

>>>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
demands of the runner. Imagine the following case: a file contains a list
of "master" Kafka topics, on which there are published additional Kafka
topics to read.

>>>>>>>>>>>>>>>> PCollection<String> masterTopics =
TextIO.read().from(masterTopicsFile)
>>>>>>>>>>>>>>>> PCollection<String> nestedTopics =
masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>>>>> PCollection<String> records =
nestedTopics.apply(ParDo(ReadFromKafkaFn))

>>>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that
emits infinite output for every input:
>>>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to
the result of reading a text file)
>>>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
unbounded number of streams being read concurrently, each of the streams
themselves is unbounded too)

>>>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for
this case? I suppose the second case is harder, so we can focus on that.

>>>>>>>>>>>>>>> So none of those are a splittabledofn right?

>>>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is
a splittable DoFn and we're trying to figure out how to make Spark run it.


>>>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as
old style DoFns in my head.

>>>>>>>>>>>>> To effectively allow us to union back into the “same” DStream
  we’d have to end up using Sparks queue streams (or their equivalent custom
source because of some queue stream limitations), which invites some
reliability challenges. This might be at the point where I should send a
diagram/some sample code since it’s a bit convoluted.

>>>>>>>>>>>>> The more I think about the jumps required to make the
“simple” union approach work, the more it seems just using the statemapping
for steaming is probably more reasonable. Although the state tracking in
Spark can be somewhat expensive so it would probably make sense to
benchmark to see if it meets our needs.

>>>>>>>>>>>> So the problem is, I don't think this can be made to work
using mapWithState. It doesn't allow a mapping function that emits infinite
output for an input element, directly or not.

>>>>>>>>>>> So, provided there is an infinite input (eg pick a never ending
queue stream), and each call produces a finite output, we would have an
infinite number of calls.


>>>>>>>>>>>> Dataflow and Flink, for example, had timer support even before
SDFs, and a timer can set another timer and thus end up doing an infinite
amount of work in a fault tolerant way - so SDF could be implemented on top
of that. But AFAIK spark doesn't have a similar feature, hence my concern.

>>>>>>>>>>> So we can do an inifinite queue stream which would allow us to
be triggered at each interval and handle our own persistence.



>>>>>>>>>>>>> But these still are both DStream based rather than Dataset
which we might want to support (depends on what direction folks take with
the runners).

>>>>>>>>>>>>> If we wanted to do this in the dataset world looking at a
custom sink/source would also be an option, (which is effectively what a
custom queue stream like thing for dstreams requires), but the datasource
APIs are a bit influx so if we ended up doing things at the edge of what’s
allowed there’s a good chance we’d have to rewrite it a few times.


>>>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we
can get the underlying RDD implementation for each microbatch and do our
work inside of that.




>>>>>>>>>>>>>>>>> More generally this does raise an important question if
we want to target datasets instead of rdds/DStreams in which case i would
need to do some more poking.


>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
relax@google.com> wrote:

>>>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
reprocessing, the same way you proposed for SDF?

>>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
system. Could use a singleton so if a partition is re-executed it doesn’t
end up as a straggler.



>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
holden@pigscanfly.ca> wrote:

>>>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.

>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
kirpichov@google.com> wrote:

>>>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
support for state)

>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
relax@google.com> wrote:

>>>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function
to keep track of the computation so far instead of outputting V each time?
(also the progress so far is probably of a different type R rather than V).


>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
holden@pigscanfly.ca> wrote:

>>>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to
add something like SplittableDoFns to Spark. I'd done some sketchy thinking
about this last year but didn't get very far.

>>>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>>>>>> Output type V

>>>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
otherwise V will be

>>>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
seconds or listen for a signal on a port

>>>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and
filter for the ones with T and V into seperate collections re-run until
finished
>>>>>>>>>>>>>>>>>>>>>>> and then union the results


>>>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was
minimally complicated and I figured terrible was a good place to start and
improve from.


>>>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts
where this is worse than I remember because its been awhile since I thought
about this.


>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau

>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau

>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau

>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau

>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau

>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau





>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau




>> --
>> Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Kenneth Knowles <kl...@google.com>.
I'm still pretty shallow on this topic & this thread, so forgive if I'm
restating or missing things.

My understanding is that the Spark runner does support Beam's triggering
semantics for unbounded aggregations, using the same support code from
runners/core that all runners use. Relevant code in SparkTimerInternals and
SparkGroupAlsoByWindowViaWindowSet.

IIRC timers are stored in state, scanned each microbatch to see which are
eligible. I don't see an immediate barrier to having timer loops. I don't
know about performance of this approach, but currently the number of timers
per shard (key+window) is bounded by their declarations in code, so it is a
tiny number unless codegenerated. We do later want to have dynamic timers
(some people call it a TimerMap by analogy with MapState) but I haven't
seen a design or even a sketch that I can recall.

Kenn

On Thu, Apr 26, 2018 at 1:48 PM Holden Karau <ho...@pigscanfly.ca> wrote:

> Yeah that's been the implied source of being able to be continuous, you
> union with a receiver which produce an infinite number of batches (the
> "never ending queue stream" but not actually a queuestream since they have
> some limitations but our own implementation there of).
>
> On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote:
>
>> Could we do this behind the scenes by writing a Receiver that publishes
>> periodic pings?
>>
>> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Kenn - I'm arguing that in Spark SDF style computation can not be
>>> expressed at all, and neither can Beam's timers.
>>>
>>> Spark, unlike Flink, does not have a timer facility (only state), and as
>>> far as I can tell its programming model has no other primitive that can map
>>> a finite RDD into an infinite DStream - the only way to create a new
>>> infinite DStream appears to be to write a Receiver.
>>>
>>> I cc'd you because I'm wondering whether you've already investigated
>>> this when considering whether timers can be implemented on the Spark runner.
>>>
>>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> I don't think I understand what the limitations of timers are that you
>>>> are referring to. FWIW I would say implementing other primitives like SDF
>>>> is an explicit non-goal for Beam state & timers.
>>>>
>>>> I got lost at some point in this thread, but is it actually necessary
>>>> that a bounded PCollection maps to a finite/bounded structure in Spark?
>>>> Skimming, I'm not sure if the problem is that we can't transliterate Beam
>>>> to Spark (this might be a good sign) or that we can't express SDF style
>>>> computation at all (seems far-fetched, but I could be convinced). Does
>>>> doing a lightweight analysis and just promoting some things to be some kind
>>>> of infinite representation help?
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Would like to revive this thread one more time.
>>>>>
>>>>> At this point I'm pretty certain that Spark can't support this out of
>>>>> the box and we're gonna have to make changes to Spark.
>>>>>
>>>>> Holden, could you advise who would be some Spark experts (yourself
>>>>> included :) ) who could advise what kind of Spark change would both support
>>>>> this AND be useful to the regular Spark community (non-Beam) so that it has
>>>>> a chance of finding support? E.g. is there any plan in Spark regarding
>>>>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>>>>> with that?
>>>>>
>>>>> +Kenneth Knowles <kl...@google.com> because timers suffer from the same
>>>>> problem.
>>>>>
>>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> (resurrecting thread as I'm back from leave)
>>>>>>
>>>>>> I looked at this mode, and indeed as Reuven points out it seems that
>>>>>> it affects execution details, but doesn't offer any new APIs.
>>>>>> Holden - your suggestions of piggybacking an unbounded-per-element
>>>>>> SDF on top of an infinite stream would work if 1) there was just 1 element
>>>>>> and 2) the work was guaranteed to be infinite.
>>>>>>
>>>>>> Unfortunately, both of these assumptions are insufficient. In
>>>>>> particular:
>>>>>>
>>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself may
>>>>>> be unbounded; and the unbounded work done by the SDF happens for every
>>>>>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>>>>>> arrive, and we may end up concurrently reading a continuously growing
>>>>>> number of topics.
>>>>>> - 2: The work per element is not necessarily infinite, it's just *not
>>>>>> guaranteed to be finite* - the SDF is allowed at any moment to say
>>>>>> "Okay, this restriction is done for real" by returning stop() from the
>>>>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>>>>>> that if the topic/partition being watched is deleted. Having an infinite
>>>>>> stream as a driver of this process would require being able to send a
>>>>>> signal to the stream to stop itself.
>>>>>>
>>>>>> Is it looking like there's any other way this can be done in Spark
>>>>>> as-is, or are we going to have to make changes to Spark to support this?
>>>>>>
>>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> I mean the new mode is very much in the Dataset not the DStream API
>>>>>>> (although you can use the Dataset API with the old modes too).
>>>>>>>
>>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> But this new mode isn't a semantic change, right? It's moving away
>>>>>>>> from micro batches into something that looks a lot like what Flink does -
>>>>>>>> continuous processing with asynchronous snapshot boundaries.
>>>>>>>>
>>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hopefully the new "continuous processing mode" in Spark will
>>>>>>>>> enable SDF implementation (and real streaming)?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Thomas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <
>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <
>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears to have no
>>>>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains a list
>>>>>>>>>>>>>>> of "master" Kafka topics, on which there are published additional Kafka
>>>>>>>>>>>>>>> topics to read.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>>>> PCollection<String> records =
>>>>>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that
>>>>>>>>>>>>>>> emits infinite output for every input:
>>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>>>>>> style DoFns in my head.
>>>>>>>>>>>>
>>>>>>>>>>>> To effectively allow us to union back into the “same” DStream
>>>>>>>>>>>>  we’d have to end up using Sparks queue streams (or their equivalent custom
>>>>>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>>>>>
>>>>>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>>>>>> union approach work, the more it seems just using the statemapping for
>>>>>>>>>>>> steaming is probably more reasonable. Although the state tracking in Spark
>>>>>>>>>>>> can be somewhat expensive so it would probably make sense to benchmark to
>>>>>>>>>>>> see if it meets our needs.
>>>>>>>>>>>>
>>>>>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>>>>>> output for an input element, directly or not.
>>>>>>>>>>>
>>>>>>>>>> So, provided there is an infinite input (eg pick a never ending
>>>>>>>>>> queue stream), and each call produces a finite output, we would have an
>>>>>>>>>> infinite number of calls.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dataflow and Flink, for example, had timer support even before
>>>>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an infinite
>>>>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented on top
>>>>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>>>>>>
>>>>>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>>>>>> triggered at each interval and handle our own persistence.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> But these still are both DStream based rather than Dataset
>>>>>>>>>>>> which we might want to support (depends on what direction folks take with
>>>>>>>>>>>> the runners).
>>>>>>>>>>>>
>>>>>>>>>>>> If we wanted to do this in the dataset world looking at a
>>>>>>>>>>>> custom sink/source would also be an option, (which is effectively what a
>>>>>>>>>>>> custom queue stream like thing for dstreams requires), but the datasource
>>>>>>>>>>>> APIs are a bit influx so if we ended up doing things at the edge of what’s
>>>>>>>>>>>> allowed there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can
>>>>>>>>>>>>>> get the underlying RDD implementation for each microbatch and do our work
>>>>>>>>>>>>>> inside of that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> More generally this does raise an important question if we
>>>>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i would need
>>>>>>>>>>>>>>>> to do some more poking.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it doesn’t
>>>>>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
>>>>>>>>>>>>>>>>>>>> support for state)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V each time?
>>>>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to
>>>>>>>>>>>>>>>>>>>>>> add something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter
>>>>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since I thought about
>>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
Yeah that's been the implied source of being able to be continuous, you
union with a receiver which produce an infinite number of batches (the
"never ending queue stream" but not actually a queuestream since they have
some limitations but our own implementation there of).

On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote:

> Could we do this behind the scenes by writing a Receiver that publishes
> periodic pings?
>
> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Kenn - I'm arguing that in Spark SDF style computation can not be
>> expressed at all, and neither can Beam's timers.
>>
>> Spark, unlike Flink, does not have a timer facility (only state), and as
>> far as I can tell its programming model has no other primitive that can map
>> a finite RDD into an infinite DStream - the only way to create a new
>> infinite DStream appears to be to write a Receiver.
>>
>> I cc'd you because I'm wondering whether you've already investigated this
>> when considering whether timers can be implemented on the Spark runner.
>>
>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> I don't think I understand what the limitations of timers are that you
>>> are referring to. FWIW I would say implementing other primitives like SDF
>>> is an explicit non-goal for Beam state & timers.
>>>
>>> I got lost at some point in this thread, but is it actually necessary
>>> that a bounded PCollection maps to a finite/bounded structure in Spark?
>>> Skimming, I'm not sure if the problem is that we can't transliterate Beam
>>> to Spark (this might be a good sign) or that we can't express SDF style
>>> computation at all (seems far-fetched, but I could be convinced). Does
>>> doing a lightweight analysis and just promoting some things to be some kind
>>> of infinite representation help?
>>>
>>> Kenn
>>>
>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Would like to revive this thread one more time.
>>>>
>>>> At this point I'm pretty certain that Spark can't support this out of
>>>> the box and we're gonna have to make changes to Spark.
>>>>
>>>> Holden, could you advise who would be some Spark experts (yourself
>>>> included :) ) who could advise what kind of Spark change would both support
>>>> this AND be useful to the regular Spark community (non-Beam) so that it has
>>>> a chance of finding support? E.g. is there any plan in Spark regarding
>>>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>>>> with that?
>>>>
>>>> +Kenneth Knowles <kl...@google.com> because timers suffer from the same
>>>> problem.
>>>>
>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> (resurrecting thread as I'm back from leave)
>>>>>
>>>>> I looked at this mode, and indeed as Reuven points out it seems that
>>>>> it affects execution details, but doesn't offer any new APIs.
>>>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF
>>>>> on top of an infinite stream would work if 1) there was just 1 element and
>>>>> 2) the work was guaranteed to be infinite.
>>>>>
>>>>> Unfortunately, both of these assumptions are insufficient. In
>>>>> particular:
>>>>>
>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself may
>>>>> be unbounded; and the unbounded work done by the SDF happens for every
>>>>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>>>>> arrive, and we may end up concurrently reading a continuously growing
>>>>> number of topics.
>>>>> - 2: The work per element is not necessarily infinite, it's just *not
>>>>> guaranteed to be finite* - the SDF is allowed at any moment to say
>>>>> "Okay, this restriction is done for real" by returning stop() from the
>>>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>>>>> that if the topic/partition being watched is deleted. Having an infinite
>>>>> stream as a driver of this process would require being able to send a
>>>>> signal to the stream to stop itself.
>>>>>
>>>>> Is it looking like there's any other way this can be done in Spark
>>>>> as-is, or are we going to have to make changes to Spark to support this?
>>>>>
>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> I mean the new mode is very much in the Dataset not the DStream API
>>>>>> (although you can use the Dataset API with the old modes too).
>>>>>>
>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> But this new mode isn't a semantic change, right? It's moving away
>>>>>>> from micro batches into something that looks a lot like what Flink does -
>>>>>>> continuous processing with asynchronous snapshot boundaries.
>>>>>>>
>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>>>>> SDF implementation (and real streaming)?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <holden@pigscanfly.ca
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears to have no
>>>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains a list
>>>>>>>>>>>>>> of "master" Kafka topics, on which there are published additional Kafka
>>>>>>>>>>>>>> topics to read.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> PCollection<String> masterTopics = TextIO.read().from(
>>>>>>>>>>>>>> masterTopicsFile)
>>>>>>>>>>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo(
>>>>>>>>>>>>>> ReadFromKafkaFn))
>>>>>>>>>>>>>> PCollection<String> records = nestedTopics.apply(ParDo(
>>>>>>>>>>>>>> ReadFromKafkaFn))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>>>>
>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>>>>> style DoFns in my head.
>>>>>>>>>>>
>>>>>>>>>>> To effectively allow us to union back into the “same” DStream
>>>>>>>>>>>  we’d have to end up using Sparks queue streams (or their equivalent custom
>>>>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>>>>
>>>>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>>>>> union approach work, the more it seems just using the statemapping for
>>>>>>>>>>> steaming is probably more reasonable. Although the state tracking in Spark
>>>>>>>>>>> can be somewhat expensive so it would probably make sense to benchmark to
>>>>>>>>>>> see if it meets our needs.
>>>>>>>>>>>
>>>>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>>>>> output for an input element, directly or not.
>>>>>>>>>>
>>>>>>>>> So, provided there is an infinite input (eg pick a never ending
>>>>>>>>> queue stream), and each call produces a finite output, we would have an
>>>>>>>>> infinite number of calls.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dataflow and Flink, for example, had timer support even before
>>>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an infinite
>>>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented on top
>>>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>>>>>
>>>>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>>>>> triggered at each interval and handle our own persistence.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> But these still are both DStream based rather than Dataset which
>>>>>>>>>>> we might want to support (depends on what direction folks take with the
>>>>>>>>>>> runners).
>>>>>>>>>>>
>>>>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>>>>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can
>>>>>>>>>>>>> get the underlying RDD implementation for each microbatch and do our work
>>>>>>>>>>>>> inside of that.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> More generally this does raise an important question if we
>>>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i would need
>>>>>>>>>>>>>>> to do some more poking.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it doesn’t
>>>>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
>>>>>>>>>>>>>>>>>>> support for state)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V each time?
>>>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter
>>>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since I thought about
>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>>


-- 
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Reuven Lax <re...@google.com>.
Could we do this behind the scenes by writing a Receiver that publishes
periodic pings?

On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Kenn - I'm arguing that in Spark SDF style computation can not be
> expressed at all, and neither can Beam's timers.
>
> Spark, unlike Flink, does not have a timer facility (only state), and as
> far as I can tell its programming model has no other primitive that can map
> a finite RDD into an infinite DStream - the only way to create a new
> infinite DStream appears to be to write a Receiver.
>
> I cc'd you because I'm wondering whether you've already investigated this
> when considering whether timers can be implemented on the Spark runner.
>
> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <kl...@google.com> wrote:
>
>> I don't think I understand what the limitations of timers are that you
>> are referring to. FWIW I would say implementing other primitives like SDF
>> is an explicit non-goal for Beam state & timers.
>>
>> I got lost at some point in this thread, but is it actually necessary
>> that a bounded PCollection maps to a finite/bounded structure in Spark?
>> Skimming, I'm not sure if the problem is that we can't transliterate Beam
>> to Spark (this might be a good sign) or that we can't express SDF style
>> computation at all (seems far-fetched, but I could be convinced). Does
>> doing a lightweight analysis and just promoting some things to be some kind
>> of infinite representation help?
>>
>> Kenn
>>
>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Would like to revive this thread one more time.
>>>
>>> At this point I'm pretty certain that Spark can't support this out of
>>> the box and we're gonna have to make changes to Spark.
>>>
>>> Holden, could you advise who would be some Spark experts (yourself
>>> included :) ) who could advise what kind of Spark change would both support
>>> this AND be useful to the regular Spark community (non-Beam) so that it has
>>> a chance of finding support? E.g. is there any plan in Spark regarding
>>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>>> with that?
>>>
>>> +Kenneth Knowles <kl...@google.com> because timers suffer from the same
>>> problem.
>>>
>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> (resurrecting thread as I'm back from leave)
>>>>
>>>> I looked at this mode, and indeed as Reuven points out it seems that it
>>>> affects execution details, but doesn't offer any new APIs.
>>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF
>>>> on top of an infinite stream would work if 1) there was just 1 element and
>>>> 2) the work was guaranteed to be infinite.
>>>>
>>>> Unfortunately, both of these assumptions are insufficient. In
>>>> particular:
>>>>
>>>> - 1: The SDF is applied to a PCollection; the PCollection itself may be
>>>> unbounded; and the unbounded work done by the SDF happens for every
>>>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>>>> arrive, and we may end up concurrently reading a continuously growing
>>>> number of topics.
>>>> - 2: The work per element is not necessarily infinite, it's just *not
>>>> guaranteed to be finite* - the SDF is allowed at any moment to say
>>>> "Okay, this restriction is done for real" by returning stop() from the
>>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>>>> that if the topic/partition being watched is deleted. Having an infinite
>>>> stream as a driver of this process would require being able to send a
>>>> signal to the stream to stop itself.
>>>>
>>>> Is it looking like there's any other way this can be done in Spark
>>>> as-is, or are we going to have to make changes to Spark to support this?
>>>>
>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> I mean the new mode is very much in the Dataset not the DStream API
>>>>> (although you can use the Dataset API with the old modes too).
>>>>>
>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> But this new mode isn't a semantic change, right? It's moving away
>>>>>> from micro batches into something that looks a lot like what Flink does -
>>>>>> continuous processing with asynchronous snapshot boundaries.
>>>>>>
>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>>>> SDF implementation (and real streaming)?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears to have no
>>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains a list
>>>>>>>>>>>>> of "master" Kafka topics, on which there are published additional Kafka
>>>>>>>>>>>>> topics to read.
>>>>>>>>>>>>>
>>>>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>> PCollection<String> records =
>>>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>>
>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>>>>
>>>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>>>
>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>>>> style DoFns in my head.
>>>>>>>>>>
>>>>>>>>>> To effectively allow us to union back into the “same” DStream
>>>>>>>>>>  we’d have to end up using Sparks queue streams (or their equivalent custom
>>>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>>>
>>>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>>>> union approach work, the more it seems just using the statemapping for
>>>>>>>>>> steaming is probably more reasonable. Although the state tracking in Spark
>>>>>>>>>> can be somewhat expensive so it would probably make sense to benchmark to
>>>>>>>>>> see if it meets our needs.
>>>>>>>>>>
>>>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>>>> output for an input element, directly or not.
>>>>>>>>>
>>>>>>>> So, provided there is an infinite input (eg pick a never ending
>>>>>>>> queue stream), and each call produces a finite output, we would have an
>>>>>>>> infinite number of calls.
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dataflow and Flink, for example, had timer support even before
>>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an infinite
>>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented on top
>>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>>>>
>>>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>>>> triggered at each interval and handle our own persistence.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> But these still are both DStream based rather than Dataset which
>>>>>>>>>> we might want to support (depends on what direction folks take with the
>>>>>>>>>> runners).
>>>>>>>>>>
>>>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>>>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can
>>>>>>>>>>>> get the underlying RDD implementation for each microbatch and do our work
>>>>>>>>>>>> inside of that.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> More generally this does raise an important question if we
>>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i would need
>>>>>>>>>>>>>> to do some more poking.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it doesn’t
>>>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
>>>>>>>>>>>>>>>>>> support for state)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V each time?
>>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter
>>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since I thought about
>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>>

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
Kenn - I'm arguing that in Spark SDF style computation can not be expressed
at all, and neither can Beam's timers.

Spark, unlike Flink, does not have a timer facility (only state), and as
far as I can tell its programming model has no other primitive that can map
a finite RDD into an infinite DStream - the only way to create a new
infinite DStream appears to be to write a Receiver.

I cc'd you because I'm wondering whether you've already investigated this
when considering whether timers can be implemented on the Spark runner.

On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <kl...@google.com> wrote:

> I don't think I understand what the limitations of timers are that you are
> referring to. FWIW I would say implementing other primitives like SDF is an
> explicit non-goal for Beam state & timers.
>
> I got lost at some point in this thread, but is it actually necessary that
> a bounded PCollection maps to a finite/bounded structure in Spark?
> Skimming, I'm not sure if the problem is that we can't transliterate Beam
> to Spark (this might be a good sign) or that we can't express SDF style
> computation at all (seems far-fetched, but I could be convinced). Does
> doing a lightweight analysis and just promoting some things to be some kind
> of infinite representation help?
>
> Kenn
>
> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Would like to revive this thread one more time.
>>
>> At this point I'm pretty certain that Spark can't support this out of the
>> box and we're gonna have to make changes to Spark.
>>
>> Holden, could you advise who would be some Spark experts (yourself
>> included :) ) who could advise what kind of Spark change would both support
>> this AND be useful to the regular Spark community (non-Beam) so that it has
>> a chance of finding support? E.g. is there any plan in Spark regarding
>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>> with that?
>>
>> +Kenneth Knowles <kl...@google.com> because timers suffer from the same
>> problem.
>>
>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> (resurrecting thread as I'm back from leave)
>>>
>>> I looked at this mode, and indeed as Reuven points out it seems that it
>>> affects execution details, but doesn't offer any new APIs.
>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF
>>> on top of an infinite stream would work if 1) there was just 1 element and
>>> 2) the work was guaranteed to be infinite.
>>>
>>> Unfortunately, both of these assumptions are insufficient. In particular:
>>>
>>> - 1: The SDF is applied to a PCollection; the PCollection itself may be
>>> unbounded; and the unbounded work done by the SDF happens for every
>>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>>> arrive, and we may end up concurrently reading a continuously growing
>>> number of topics.
>>> - 2: The work per element is not necessarily infinite, it's just *not
>>> guaranteed to be finite* - the SDF is allowed at any moment to say
>>> "Okay, this restriction is done for real" by returning stop() from the
>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>>> that if the topic/partition being watched is deleted. Having an infinite
>>> stream as a driver of this process would require being able to send a
>>> signal to the stream to stop itself.
>>>
>>> Is it looking like there's any other way this can be done in Spark
>>> as-is, or are we going to have to make changes to Spark to support this?
>>>
>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> I mean the new mode is very much in the Dataset not the DStream API
>>>> (although you can use the Dataset API with the old modes too).
>>>>
>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> But this new mode isn't a semantic change, right? It's moving away
>>>>> from micro batches into something that looks a lot like what Flink does -
>>>>> continuous processing with asynchronous snapshot boundaries.
>>>>>
>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>>> SDF implementation (and real streaming)?
>>>>>>
>>>>>> Thanks,
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>>> kirpichov@google.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears to have no
>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>>
>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>>
>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains a list
>>>>>>>>>>>> of "master" Kafka topics, on which there are published additional Kafka
>>>>>>>>>>>> topics to read.
>>>>>>>>>>>>
>>>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>> PCollection<String> records =
>>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>
>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>>
>>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>>>
>>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>>
>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>>> style DoFns in my head.
>>>>>>>>>
>>>>>>>>> To effectively allow us to union back into the “same” DStream
>>>>>>>>>  we’d have to end up using Sparks queue streams (or their equivalent custom
>>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>>
>>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>>> union approach work, the more it seems just using the statemapping for
>>>>>>>>> steaming is probably more reasonable. Although the state tracking in Spark
>>>>>>>>> can be somewhat expensive so it would probably make sense to benchmark to
>>>>>>>>> see if it meets our needs.
>>>>>>>>>
>>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>>> output for an input element, directly or not.
>>>>>>>>
>>>>>>> So, provided there is an infinite input (eg pick a never ending
>>>>>>> queue stream), and each call produces a finite output, we would have an
>>>>>>> infinite number of calls.
>>>>>>>
>>>>>>>>
>>>>>>>> Dataflow and Flink, for example, had timer support even before
>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an infinite
>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented on top
>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>>>
>>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>>> triggered at each interval and handle our own persistence.
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> But these still are both DStream based rather than Dataset which
>>>>>>>>> we might want to support (depends on what direction folks take with the
>>>>>>>>> runners).
>>>>>>>>>
>>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>>>>> the underlying RDD implementation for each microbatch and do our work
>>>>>>>>>>> inside of that.
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> More generally this does raise an important question if we
>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i would need
>>>>>>>>>>>>> to do some more poking.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it doesn’t
>>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V each time?
>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter
>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since I thought about
>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>>

Re: Splittable DoFN in Spark discussion

Posted by Kenneth Knowles <kl...@google.com>.
I don't think I understand what the limitations of timers are that you are
referring to. FWIW I would say implementing other primitives like SDF is an
explicit non-goal for Beam state & timers.

I got lost at some point in this thread, but is it actually necessary that
a bounded PCollection maps to a finite/bounded structure in Spark?
Skimming, I'm not sure if the problem is that we can't transliterate Beam
to Spark (this might be a good sign) or that we can't express SDF style
computation at all (seems far-fetched, but I could be convinced). Does
doing a lightweight analysis and just promoting some things to be some kind
of infinite representation help?

Kenn

On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Would like to revive this thread one more time.
>
> At this point I'm pretty certain that Spark can't support this out of the
> box and we're gonna have to make changes to Spark.
>
> Holden, could you advise who would be some Spark experts (yourself
> included :) ) who could advise what kind of Spark change would both support
> this AND be useful to the regular Spark community (non-Beam) so that it has
> a chance of finding support? E.g. is there any plan in Spark regarding
> adding timers similar to Flink's or Beam's timers, maybe we could help out
> with that?
>
> +Kenneth Knowles <kl...@google.com> because timers suffer from the same
> problem.
>
> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> (resurrecting thread as I'm back from leave)
>>
>> I looked at this mode, and indeed as Reuven points out it seems that it
>> affects execution details, but doesn't offer any new APIs.
>> Holden - your suggestions of piggybacking an unbounded-per-element SDF on
>> top of an infinite stream would work if 1) there was just 1 element and 2)
>> the work was guaranteed to be infinite.
>>
>> Unfortunately, both of these assumptions are insufficient. In particular:
>>
>> - 1: The SDF is applied to a PCollection; the PCollection itself may be
>> unbounded; and the unbounded work done by the SDF happens for every
>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>> arrive, and we may end up concurrently reading a continuously growing
>> number of topics.
>> - 2: The work per element is not necessarily infinite, it's just *not
>> guaranteed to be finite* - the SDF is allowed at any moment to say
>> "Okay, this restriction is done for real" by returning stop() from the
>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>> that if the topic/partition being watched is deleted. Having an infinite
>> stream as a driver of this process would require being able to send a
>> signal to the stream to stop itself.
>>
>> Is it looking like there's any other way this can be done in Spark as-is,
>> or are we going to have to make changes to Spark to support this?
>>
>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> I mean the new mode is very much in the Dataset not the DStream API
>>> (although you can use the Dataset API with the old modes too).
>>>
>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>
>>>> But this new mode isn't a semantic change, right? It's moving away from
>>>> micro batches into something that looks a lot like what Flink does -
>>>> continuous processing with asynchronous snapshot boundaries.
>>>>
>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>> SDF implementation (and real streaming)?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears to have no
>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>
>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>
>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains a list
>>>>>>>>>>> of "master" Kafka topics, on which there are published additional Kafka
>>>>>>>>>>> topics to read.
>>>>>>>>>>>
>>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>> PCollection<String> records =
>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>
>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>
>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>>
>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>
>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>> style DoFns in my head.
>>>>>>>>
>>>>>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>>>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>
>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>> union approach work, the more it seems just using the statemapping for
>>>>>>>> steaming is probably more reasonable. Although the state tracking in Spark
>>>>>>>> can be somewhat expensive so it would probably make sense to benchmark to
>>>>>>>> see if it meets our needs.
>>>>>>>>
>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>> output for an input element, directly or not.
>>>>>>>
>>>>>> So, provided there is an infinite input (eg pick a never ending queue
>>>>>> stream), and each call produces a finite output, we would have an infinite
>>>>>> number of calls.
>>>>>>
>>>>>>>
>>>>>>> Dataflow and Flink, for example, had timer support even before SDFs,
>>>>>>> and a timer can set another timer and thus end up doing an infinite amount
>>>>>>> of work in a fault tolerant way - so SDF could be implemented on top of
>>>>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>>
>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>> triggered at each interval and handle our own persistence.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> But these still are both DStream based rather than Dataset which we
>>>>>>>> might want to support (depends on what direction folks take with the
>>>>>>>> runners).
>>>>>>>>
>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>
>>>>>>>>
>>>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>>>> the underlying RDD implementation for each microbatch and do our work
>>>>>>>>>> inside of that.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> More generally this does raise an important question if we want
>>>>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would need to
>>>>>>>>>>>> do some more poking.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it doesn’t
>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V each time?
>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter for
>>>>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since I thought about
>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>>
>>>>>
>>>
>>>
>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
Would like to revive this thread one more time.

At this point I'm pretty certain that Spark can't support this out of the
box and we're gonna have to make changes to Spark.

Holden, could you advise who would be some Spark experts (yourself included
:) ) who could advise what kind of Spark change would both support this AND
be useful to the regular Spark community (non-Beam) so that it has a chance
of finding support? E.g. is there any plan in Spark regarding adding timers
similar to Flink's or Beam's timers, maybe we could help out with that?

+Kenneth Knowles <kl...@google.com> because timers suffer from the same
problem.

On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <ki...@google.com>
wrote:

> (resurrecting thread as I'm back from leave)
>
> I looked at this mode, and indeed as Reuven points out it seems that it
> affects execution details, but doesn't offer any new APIs.
> Holden - your suggestions of piggybacking an unbounded-per-element SDF on
> top of an infinite stream would work if 1) there was just 1 element and 2)
> the work was guaranteed to be infinite.
>
> Unfortunately, both of these assumptions are insufficient. In particular:
>
> - 1: The SDF is applied to a PCollection; the PCollection itself may be
> unbounded; and the unbounded work done by the SDF happens for every
> element. E.g. we might have a Kafka topic on which names of Kafka topics
> arrive, and we may end up concurrently reading a continuously growing
> number of topics.
> - 2: The work per element is not necessarily infinite, it's just *not
> guaranteed to be finite* - the SDF is allowed at any moment to say "Okay,
> this restriction is done for real" by returning stop() from the
> @ProcessElement method. Continuing the Kafka example, e.g., it could do
> that if the topic/partition being watched is deleted. Having an infinite
> stream as a driver of this process would require being able to send a
> signal to the stream to stop itself.
>
> Is it looking like there's any other way this can be done in Spark as-is,
> or are we going to have to make changes to Spark to support this?
>
> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca> wrote:
>
>> I mean the new mode is very much in the Dataset not the DStream API
>> (although you can use the Dataset API with the old modes too).
>>
>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>
>>> But this new mode isn't a semantic change, right? It's moving away from
>>> micro batches into something that looks a lot like what Flink does -
>>> continuous processing with asynchronous snapshot boundaries.
>>>
>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Hopefully the new "continuous processing mode" in Spark will enable SDF
>>>> implementation (and real streaming)?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>>
>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>> kirpichov@google.com> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears to have no
>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>
>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>
>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>> demands of the runner. Imagine the following case: a file contains a list
>>>>>>>>>> of "master" Kafka topics, on which there are published additional Kafka
>>>>>>>>>> topics to read.
>>>>>>>>>>
>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>> PCollection<String> records =
>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>
>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>> infinite output for every input:
>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>> result of reading a text file)
>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>
>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>
>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>
>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>>
>>>>>>>>
>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>>>>> DoFns in my head.
>>>>>>>
>>>>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>
>>>>>>> The more I think about the jumps required to make the “simple” union
>>>>>>> approach work, the more it seems just using the statemapping for steaming
>>>>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>>>>> somewhat expensive so it would probably make sense to benchmark to see if
>>>>>>> it meets our needs.
>>>>>>>
>>>>>> So the problem is, I don't think this can be made to work using
>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>> output for an input element, directly or not.
>>>>>>
>>>>> So, provided there is an infinite input (eg pick a never ending queue
>>>>> stream), and each call produces a finite output, we would have an infinite
>>>>> number of calls.
>>>>>
>>>>>>
>>>>>> Dataflow and Flink, for example, had timer support even before SDFs,
>>>>>> and a timer can set another timer and thus end up doing an infinite amount
>>>>>> of work in a fault tolerant way - so SDF could be implemented on top of
>>>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>
>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>> triggered at each interval and handle our own persistence.
>>>>>
>>>>>>
>>>>>>
>>>>>>> But these still are both DStream based rather than Dataset which we
>>>>>>> might want to support (depends on what direction folks take with the
>>>>>>> runners).
>>>>>>>
>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>
>>>>>>>
>>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>>> the underlying RDD implementation for each microbatch and do our work
>>>>>>>>> inside of that.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> More generally this does raise an important question if we want
>>>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would need to
>>>>>>>>>>> do some more poking.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>
>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it doesn’t
>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>>>>> track of the computation so far instead of outputting V each time? (also
>>>>>>>>>>>>>>>> the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter for
>>>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this
>>>>>>>>>>>>>>>>> is worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>>
>>>>
>>
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
(resurrecting thread as I'm back from leave)

I looked at this mode, and indeed as Reuven points out it seems that it
affects execution details, but doesn't offer any new APIs.
Holden - your suggestions of piggybacking an unbounded-per-element SDF on
top of an infinite stream would work if 1) there was just 1 element and 2)
the work was guaranteed to be infinite.

Unfortunately, both of these assumptions are insufficient. In particular:

- 1: The SDF is applied to a PCollection; the PCollection itself may be
unbounded; and the unbounded work done by the SDF happens for every
element. E.g. we might have a Kafka topic on which names of Kafka topics
arrive, and we may end up concurrently reading a continuously growing
number of topics.
- 2: The work per element is not necessarily infinite, it's just *not
guaranteed to be finite* - the SDF is allowed at any moment to say "Okay,
this restriction is done for real" by returning stop() from the
@ProcessElement method. Continuing the Kafka example, e.g., it could do
that if the topic/partition being watched is deleted. Having an infinite
stream as a driver of this process would require being able to send a
signal to the stream to stop itself.

Is it looking like there's any other way this can be done in Spark as-is,
or are we going to have to make changes to Spark to support this?

On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <ho...@pigscanfly.ca> wrote:

> I mean the new mode is very much in the Dataset not the DStream API
> (although you can use the Dataset API with the old modes too).
>
> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>
>> But this new mode isn't a semantic change, right? It's moving away from
>> micro batches into something that looks a lot like what Flink does -
>> continuous processing with asynchronous snapshot boundaries.
>>
>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:
>>
>>> Hopefully the new "continuous processing mode" in Spark will enable SDF
>>> implementation (and real streaming)?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>>
>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>
>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>
>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF demands
>>>>>>>>> of the runner. Imagine the following case: a file contains a list of
>>>>>>>>> "master" Kafka topics, on which there are published additional Kafka topics
>>>>>>>>> to read.
>>>>>>>>>
>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>> PCollection<String> records =
>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>
>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>> infinite output for every input:
>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>> result of reading a text file)
>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>>> themselves is unbounded too)
>>>>>>>>>
>>>>>>>>> Does the multi-level solution you have in mind work for this case?
>>>>>>>>> I suppose the second case is harder, so we can focus on that.
>>>>>>>>>
>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>
>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>>
>>>>>>>
>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>>>> DoFns in my head.
>>>>>>
>>>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>>>> source because of some queue stream limitations), which invites some
>>>>>> reliability challenges. This might be at the point where I should send a
>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>
>>>>>> The more I think about the jumps required to make the “simple” union
>>>>>> approach work, the more it seems just using the statemapping for steaming
>>>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>>>> somewhat expensive so it would probably make sense to benchmark to see if
>>>>>> it meets our needs.
>>>>>>
>>>>> So the problem is, I don't think this can be made to work using
>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>> output for an input element, directly or not.
>>>>>
>>>> So, provided there is an infinite input (eg pick a never ending queue
>>>> stream), and each call produces a finite output, we would have an infinite
>>>> number of calls.
>>>>
>>>>>
>>>>> Dataflow and Flink, for example, had timer support even before SDFs,
>>>>> and a timer can set another timer and thus end up doing an infinite amount
>>>>> of work in a fault tolerant way - so SDF could be implemented on top of
>>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>
>>>> So we can do an inifinite queue stream which would allow us to be
>>>> triggered at each interval and handle our own persistence.
>>>>
>>>>>
>>>>>
>>>>>> But these still are both DStream based rather than Dataset which we
>>>>>> might want to support (depends on what direction folks take with the
>>>>>> runners).
>>>>>>
>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>
>>>>>>
>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>> the underlying RDD implementation for each microbatch and do our work
>>>>>>>> inside of that.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> More generally this does raise an important question if we want
>>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would need to
>>>>>>>>>> do some more poking.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>>>>
>>>>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up as
>>>>>>>>>> a straggler.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>>>> track of the computation so far instead of outputting V each time? (also
>>>>>>>>>>>>>>> the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds
>>>>>>>>>>>>>>>> or listen for a signal on a port
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Once we're done running we take the result and filter for
>>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this
>>>>>>>>>>>>>>>> is worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>>
>>>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
I mean the new mode is very much in the Dataset not the DStream API
(although you can use the Dataset API with the old modes too).

On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:

> But this new mode isn't a semantic change, right? It's moving away from
> micro batches into something that looks a lot like what Flink does -
> continuous processing with asynchronous snapshot boundaries.
>
> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:
>
>> Hopefully the new "continuous processing mode" in Spark will enable SDF
>> implementation (and real streaming)?
>>
>> Thanks,
>> Thomas
>>
>>
>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>>
>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>> kirpichov@google.com> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>
>>>>>>>>> So on the streaming side we could simply do it with a fixed number
>>>>>>>>> of levels on DStreams. It’s not great but it would work.
>>>>>>>>>
>>>>>>>> Not sure I understand this. Let me try to clarify what SDF demands
>>>>>>>> of the runner. Imagine the following case: a file contains a list of
>>>>>>>> "master" Kafka topics, on which there are published additional Kafka topics
>>>>>>>> to read.
>>>>>>>>
>>>>>>>> PCollection<String> masterTopics = TextIO.read().from(
>>>>>>>> masterTopicsFile)
>>>>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo(
>>>>>>>> ReadFromKafkaFn))
>>>>>>>> PCollection<String> records = nestedTopics.apply(ParDo(
>>>>>>>> ReadFromKafkaFn))
>>>>>>>>
>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>> infinite output for every input:
>>>>>>>> - Applying it to a finite set of inputs (in this case to the result
>>>>>>>> of reading a text file)
>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>> unbounded number of streams being read concurrently, each of the streams
>>>>>>>> themselves is unbounded too)
>>>>>>>>
>>>>>>>> Does the multi-level solution you have in mind work for this case?
>>>>>>>> I suppose the second case is harder, so we can focus on that.
>>>>>>>>
>>>>>>> So none of those are a splittabledofn right?
>>>>>>>
>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>>
>>>>>>
>>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>>> DoFns in my head.
>>>>>
>>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>>> source because of some queue stream limitations), which invites some
>>>>> reliability challenges. This might be at the point where I should send a
>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>
>>>>> The more I think about the jumps required to make the “simple” union
>>>>> approach work, the more it seems just using the statemapping for steaming
>>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>>> somewhat expensive so it would probably make sense to benchmark to see if
>>>>> it meets our needs.
>>>>>
>>>> So the problem is, I don't think this can be made to work using
>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>> output for an input element, directly or not.
>>>>
>>> So, provided there is an infinite input (eg pick a never ending queue
>>> stream), and each call produces a finite output, we would have an infinite
>>> number of calls.
>>>
>>>>
>>>> Dataflow and Flink, for example, had timer support even before SDFs,
>>>> and a timer can set another timer and thus end up doing an infinite amount
>>>> of work in a fault tolerant way - so SDF could be implemented on top of
>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>
>>> So we can do an inifinite queue stream which would allow us to be
>>> triggered at each interval and handle our own persistence.
>>>
>>>>
>>>>
>>>>> But these still are both DStream based rather than Dataset which we
>>>>> might want to support (depends on what direction folks take with the
>>>>> runners).
>>>>>
>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>> sink/source would also be an option, (which is effectively what a custom
>>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>
>>>>>
>>>>>>> Assuming that we have a given dstream though in Spark we can get the
>>>>>>> underlying RDD implementation for each microbatch and do our work inside of
>>>>>>> that.
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> More generally this does raise an important question if we want to
>>>>>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>>>>>> some more poking.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>>>
>>>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up as
>>>>>>>>> a straggler.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>>>>> state)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>>> track of the computation so far instead of outputting V each time? (also
>>>>>>>>>>>>>> the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds
>>>>>>>>>>>>>>> or listen for a signal on a port
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Once we're done running we take the result and filter for
>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this
>>>>>>>>>>>>>>> is worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>


-- 
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Reuven Lax <re...@google.com>.
But this new mode isn't a semantic change, right? It's moving away from
micro batches into something that looks a lot like what Flink does -
continuous processing with asynchronous snapshot boundaries.

On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <th...@apache.org> wrote:

> Hopefully the new "continuous processing mode" in Spark will enable SDF
> implementation (and real streaming)?
>
> Thanks,
> Thomas
>
>
> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca>
> wrote:
>
>>
>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>
>>>>>>>> So on the streaming side we could simply do it with a fixed number
>>>>>>>> of levels on DStreams. It’s not great but it would work.
>>>>>>>>
>>>>>>> Not sure I understand this. Let me try to clarify what SDF demands
>>>>>>> of the runner. Imagine the following case: a file contains a list of
>>>>>>> "master" Kafka topics, on which there are published additional Kafka topics
>>>>>>> to read.
>>>>>>>
>>>>>>> PCollection<String> masterTopics =
>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>> PCollection<String> nestedTopics =
>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>> PCollection<String> records =
>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>
>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>> infinite output for every input:
>>>>>>> - Applying it to a finite set of inputs (in this case to the result
>>>>>>> of reading a text file)
>>>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>>>>> number of streams being read concurrently, each of the streams themselves
>>>>>>> is unbounded too)
>>>>>>>
>>>>>>> Does the multi-level solution you have in mind work for this case? I
>>>>>>> suppose the second case is harder, so we can focus on that.
>>>>>>>
>>>>>> So none of those are a splittabledofn right?
>>>>>>
>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>>
>>>>>
>>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>>> DoFns in my head.
>>>>
>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>> source because of some queue stream limitations), which invites some
>>>> reliability challenges. This might be at the point where I should send a
>>>> diagram/some sample code since it’s a bit convoluted.
>>>>
>>>> The more I think about the jumps required to make the “simple” union
>>>> approach work, the more it seems just using the statemapping for steaming
>>>> is probably more reasonable. Although the state tracking in Spark can be
>>>> somewhat expensive so it would probably make sense to benchmark to see if
>>>> it meets our needs.
>>>>
>>> So the problem is, I don't think this can be made to work using
>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>> output for an input element, directly or not.
>>>
>> So, provided there is an infinite input (eg pick a never ending queue
>> stream), and each call produces a finite output, we would have an infinite
>> number of calls.
>>
>>>
>>> Dataflow and Flink, for example, had timer support even before SDFs, and
>>> a timer can set another timer and thus end up doing an infinite amount of
>>> work in a fault tolerant way - so SDF could be implemented on top of that.
>>> But AFAIK spark doesn't have a similar feature, hence my concern.
>>>
>> So we can do an inifinite queue stream which would allow us to be
>> triggered at each interval and handle our own persistence.
>>
>>>
>>>
>>>> But these still are both DStream based rather than Dataset which we
>>>> might want to support (depends on what direction folks take with the
>>>> runners).
>>>>
>>>> If we wanted to do this in the dataset world looking at a custom
>>>> sink/source would also be an option, (which is effectively what a custom
>>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>
>>>>
>>>>>> Assuming that we have a given dstream though in Spark we can get the
>>>>>> underlying RDD implementation for each microbatch and do our work inside of
>>>>>> that.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> More generally this does raise an important question if we want to
>>>>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>>>>> some more poking.
>>>>>>>>
>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>>
>>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up as
>>>>>>>> a straggler.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>>>> state)
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>>> track of the computation so far instead of outputting V each time? (also
>>>>>>>>>>>>> the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>> and if the computation finishes T will be populated otherwise
>>>>>>>>>>>>>> V will be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds
>>>>>>>>>>>>>> or listen for a signal on a port
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>>>>> worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>

Re: Splittable DoFN in Spark discussion

Posted by Thomas Weise <th...@apache.org>.
Hopefully the new "continuous processing mode" in Spark will enable SDF
implementation (and real streaming)?

Thanks,
Thomas


On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <ho...@pigscanfly.ca> wrote:

>
> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>>
>>
>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca> wrote:
>>
>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>> kirpichov@google.com> wrote:
>>>>>>>
>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>
>>>>>>> So on the streaming side we could simply do it with a fixed number
>>>>>>> of levels on DStreams. It’s not great but it would work.
>>>>>>>
>>>>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>>>>> the runner. Imagine the following case: a file contains a list of "master"
>>>>>> Kafka topics, on which there are published additional Kafka topics to read.
>>>>>>
>>>>>> PCollection<String> masterTopics = TextIO.read().from(
>>>>>> masterTopicsFile)
>>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo(
>>>>>> ReadFromKafkaFn))
>>>>>> PCollection<String> records = nestedTopics.apply(ParDo(
>>>>>> ReadFromKafkaFn))
>>>>>>
>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>> infinite output for every input:
>>>>>> - Applying it to a finite set of inputs (in this case to the result
>>>>>> of reading a text file)
>>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>>>> number of streams being read concurrently, each of the streams themselves
>>>>>> is unbounded too)
>>>>>>
>>>>>> Does the multi-level solution you have in mind work for this case? I
>>>>>> suppose the second case is harder, so we can focus on that.
>>>>>>
>>>>> So none of those are a splittabledofn right?
>>>>>
>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>>
>>>>
>>> Ah ok, sorry I saw that and for some reason parsed them as old style
>>> DoFns in my head.
>>>
>>> To effectively allow us to union back into the “same” DStream  we’d have
>>> to end up using Sparks queue streams (or their equivalent custom source
>>> because of some queue stream limitations), which invites some reliability
>>> challenges. This might be at the point where I should send a diagram/some
>>> sample code since it’s a bit convoluted.
>>>
>>> The more I think about the jumps required to make the “simple” union
>>> approach work, the more it seems just using the statemapping for steaming
>>> is probably more reasonable. Although the state tracking in Spark can be
>>> somewhat expensive so it would probably make sense to benchmark to see if
>>> it meets our needs.
>>>
>> So the problem is, I don't think this can be made to work using
>> mapWithState. It doesn't allow a mapping function that emits infinite
>> output for an input element, directly or not.
>>
> So, provided there is an infinite input (eg pick a never ending queue
> stream), and each call produces a finite output, we would have an infinite
> number of calls.
>
>>
>> Dataflow and Flink, for example, had timer support even before SDFs, and
>> a timer can set another timer and thus end up doing an infinite amount of
>> work in a fault tolerant way - so SDF could be implemented on top of that.
>> But AFAIK spark doesn't have a similar feature, hence my concern.
>>
> So we can do an inifinite queue stream which would allow us to be
> triggered at each interval and handle our own persistence.
>
>>
>>
>>> But these still are both DStream based rather than Dataset which we
>>> might want to support (depends on what direction folks take with the
>>> runners).
>>>
>>> If we wanted to do this in the dataset world looking at a custom
>>> sink/source would also be an option, (which is effectively what a custom
>>> queue stream like thing for dstreams requires), but the datasource APIs are
>>> a bit influx so if we ended up doing things at the edge of what’s allowed
>>> there’s a good chance we’d have to rewrite it a few times.
>>>
>>>
>>>>> Assuming that we have a given dstream though in Spark we can get the
>>>>> underlying RDD implementation for each microbatch and do our work inside of
>>>>> that.
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> More generally this does raise an important question if we want to
>>>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>>>> some more poking.
>>>>>>>
>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> How would timers be implemented? By outputing and reprocessing,
>>>>>>>>> the same way you proposed for SDF?
>>>>>>>>>
>>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up as
>>>>>>> a straggler.
>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>>> state)
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>>> track of the computation so far instead of outputting V each time? (also
>>>>>>>>>>>> the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>
>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>
>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>> and if the computation finishes T will be populated otherwise
>>>>>>>>>>>>> V will be
>>>>>>>>>>>>>
>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>>>>
>>>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>>> from.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>>>> worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <ki...@google.com>
wrote:

>
>
> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca> wrote:
>
>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>
>>>>>> So on the streaming side we could simply do it with a fixed number of
>>>>>> levels on DStreams. It’s not great but it would work.
>>>>>>
>>>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>>>> the runner. Imagine the following case: a file contains a list of "master"
>>>>> Kafka topics, on which there are published additional Kafka topics to read.
>>>>>
>>>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
>>>>> PCollection<String> nestedTopics =
>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>> PCollection<String> records =
>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>
>>>>> This exemplifies both use cases of a streaming SDF that emits infinite
>>>>> output for every input:
>>>>> - Applying it to a finite set of inputs (in this case to the result of
>>>>> reading a text file)
>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>>> number of streams being read concurrently, each of the streams themselves
>>>>> is unbounded too)
>>>>>
>>>>> Does the multi-level solution you have in mind work for this case? I
>>>>> suppose the second case is harder, so we can focus on that.
>>>>>
>>>> So none of those are a splittabledofn right?
>>>>
>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>
>>>
>> Ah ok, sorry I saw that and for some reason parsed them as old style
>> DoFns in my head.
>>
>> To effectively allow us to union back into the “same” DStream  we’d have
>> to end up using Sparks queue streams (or their equivalent custom source
>> because of some queue stream limitations), which invites some reliability
>> challenges. This might be at the point where I should send a diagram/some
>> sample code since it’s a bit convoluted.
>>
>> The more I think about the jumps required to make the “simple” union
>> approach work, the more it seems just using the statemapping for steaming
>> is probably more reasonable. Although the state tracking in Spark can be
>> somewhat expensive so it would probably make sense to benchmark to see if
>> it meets our needs.
>>
> So the problem is, I don't think this can be made to work using
> mapWithState. It doesn't allow a mapping function that emits infinite
> output for an input element, directly or not.
>
So, provided there is an infinite input (eg pick a never ending queue
stream), and each call produces a finite output, we would have an infinite
number of calls.

>
> Dataflow and Flink, for example, had timer support even before SDFs, and a
> timer can set another timer and thus end up doing an infinite amount of
> work in a fault tolerant way - so SDF could be implemented on top of that.
> But AFAIK spark doesn't have a similar feature, hence my concern.
>
So we can do an inifinite queue stream which would allow us to be triggered
at each interval and handle our own persistence.

>
>
>> But these still are both DStream based rather than Dataset which we might
>> want to support (depends on what direction folks take with the runners).
>>
>> If we wanted to do this in the dataset world looking at a custom
>> sink/source would also be an option, (which is effectively what a custom
>> queue stream like thing for dstreams requires), but the datasource APIs are
>> a bit influx so if we ended up doing things at the edge of what’s allowed
>> there’s a good chance we’d have to rewrite it a few times.
>>
>>
>>>> Assuming that we have a given dstream though in Spark we can get the
>>>> underlying RDD implementation for each microbatch and do our work inside of
>>>> that.
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> More generally this does raise an important question if we want to
>>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>>> some more poking.
>>>>>>
>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> How would timers be implemented? By outputing and reprocessing, the
>>>>>>>> same way you proposed for SDF?
>>>>>>>>
>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up as
>>>>>> a straggler.
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>> state)
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>> track of the computation so far instead of outputting V each time? (also
>>>>>>>>>>> the progress so far is probably of a different type R rather than V).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy thinking
>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>
>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>> For input type T
>>>>>>>>>>>> Output type V
>>>>>>>>>>>>
>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>>>>>> will be
>>>>>>>>>>>>
>>>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>>>
>>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>>> from.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>>> worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
> --
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
On Fri, Mar 23, 2018, 11:17 PM Holden Karau <ho...@pigscanfly.ca> wrote:

> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>
>>>>> So on the streaming side we could simply do it with a fixed number of
>>>>> levels on DStreams. It’s not great but it would work.
>>>>>
>>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>>> the runner. Imagine the following case: a file contains a list of "master"
>>>> Kafka topics, on which there are published additional Kafka topics to read.
>>>>
>>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
>>>> PCollection<String> nestedTopics =
>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>> PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>
>>>> This exemplifies both use cases of a streaming SDF that emits infinite
>>>> output for every input:
>>>> - Applying it to a finite set of inputs (in this case to the result of
>>>> reading a text file)
>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>> number of streams being read concurrently, each of the streams themselves
>>>> is unbounded too)
>>>>
>>>> Does the multi-level solution you have in mind work for this case? I
>>>> suppose the second case is harder, so we can focus on that.
>>>>
>>> So none of those are a splittabledofn right?
>>>
>> Not sure what you mean? ReadFromKafkaFn in these examples is a splittable
>> DoFn and we're trying to figure out how to make Spark run it.
>>
>>
> Ah ok, sorry I saw that and for some reason parsed them as old style DoFns
> in my head.
>
> To effectively allow us to union back into the “same” DStream  we’d have
> to end up using Sparks queue streams (or their equivalent custom source
> because of some queue stream limitations), which invites some reliability
> challenges. This might be at the point where I should send a diagram/some
> sample code since it’s a bit convoluted.
>
> The more I think about the jumps required to make the “simple” union
> approach work, the more it seems just using the statemapping for steaming
> is probably more reasonable. Although the state tracking in Spark can be
> somewhat expensive so it would probably make sense to benchmark to see if
> it meets our needs.
>
So the problem is, I don't think this can be made to work using
mapWithState. It doesn't allow a mapping function that emits infinite
output for an input element, directly or not.

Dataflow and Flink, for example, had timer support even before SDFs, and a
timer can set another timer and thus end up doing an infinite amount of
work in a fault tolerant way - so SDF could be implemented on top of that.
But AFAIK spark doesn't have a similar feature, hence my concern.


> But these still are both DStream based rather than Dataset which we might
> want to support (depends on what direction folks take with the runners).
>
> If we wanted to do this in the dataset world looking at a custom
> sink/source would also be an option, (which is effectively what a custom
> queue stream like thing for dstreams requires), but the datasource APIs are
> a bit influx so if we ended up doing things at the edge of what’s allowed
> there’s a good chance we’d have to rewrite it a few times.
>
>
>>> Assuming that we have a given dstream though in Spark we can get the
>>> underlying RDD implementation for each microbatch and do our work inside of
>>> that.
>>>
>>>>
>>>>
>>>>>
>>>>> More generally this does raise an important question if we want to
>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>> some more poking.
>>>>>
>>>>>
>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> How would timers be implemented? By outputing and reprocessing, the
>>>>>>> same way you proposed for SDF?
>>>>>>>
>>>>>> i mean the timers could be inside the mappers within the system.
>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up as
>>>>> a straggler.
>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>> state)
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Could we alternatively use a state mapping function to keep track
>>>>>>>>>> of the computation so far instead of outputting V each time? (also the
>>>>>>>>>> progress so far is probably of a different type R rather than V).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>> holden@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> So we had a quick chat about what it would take to add something
>>>>>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about this
>>>>>>>>>>> last year but didn't get very far.
>>>>>>>>>>>
>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>> For input type T
>>>>>>>>>>> Output type V
>>>>>>>>>>>
>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>>>>> will be
>>>>>>>>>>>
>>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>>
>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>> and then union the results
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>> complicated and I figured terrible was a good place to start and improve
>>>>>>>>>>> from.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>> worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <ki...@google.com>
wrote:

> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca> wrote:
>
>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>> runs the SDF? (this is of course less efficient than a timer-capable runner
>>>>> would do, and I have doubts about the fault tolerance)
>>>>>
>>>> So on the streaming side we could simply do it with a fixed number of
>>>> levels on DStreams. It’s not great but it would work.
>>>>
>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>> the runner. Imagine the following case: a file contains a list of "master"
>>> Kafka topics, on which there are published additional Kafka topics to read.
>>>
>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
>>> PCollection<String> nestedTopics =
>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>> PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>
>>> This exemplifies both use cases of a streaming SDF that emits infinite
>>> output for every input:
>>> - Applying it to a finite set of inputs (in this case to the result of
>>> reading a text file)
>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>> number of streams being read concurrently, each of the streams themselves
>>> is unbounded too)
>>>
>>> Does the multi-level solution you have in mind work for this case? I
>>> suppose the second case is harder, so we can focus on that.
>>>
>> So none of those are a splittabledofn right?
>>
> Not sure what you mean? ReadFromKafkaFn in these examples is a splittable
> DoFn and we're trying to figure out how to make Spark run it.
>
>
Ah ok, sorry I saw that and for some reason parsed them as old style DoFns
in my head.

To effectively allow us to union back into the “same” DStream  we’d have to
end up using Sparks queue streams (or their equivalent custom source
because of some queue stream limitations), which invites some reliability
challenges. This might be at the point where I should send a diagram/some
sample code since it’s a bit convoluted.

The more I think about the jumps required to make the “simple” union
approach work, the more it seems just using the statemapping for steaming
is probably more reasonable. Although the state tracking in Spark can be
somewhat expensive so it would probably make sense to benchmark to see if
it meets our needs.

But these still are both DStream based rather than Dataset which we might
want to support (depends on what direction folks take with the runners).

If we wanted to do this in the dataset world looking at a custom
sink/source would also be an option, (which is effectively what a custom
queue stream like thing for dstreams requires), but the datasource APIs are
a bit influx so if we ended up doing things at the edge of what’s allowed
there’s a good chance we’d have to rewrite it a few times.


>> Assuming that we have a given dstream though in Spark we can get the
>> underlying RDD implementation for each microbatch and do our work inside of
>> that.
>>
>>>
>>>
>>>>
>>>> More generally this does raise an important question if we want to
>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>> some more poking.
>>>>
>>>>
>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> How would timers be implemented? By outputing and reprocessing, the
>>>>>> same way you proposed for SDF?
>>>>>>
>>>>> i mean the timers could be inside the mappers within the system. Could
>>>> use a singleton so if a partition is re-executed it doesn’t end up as a
>>>> straggler.
>>>>
>>>>>
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> So the timers would have to be in our own code.
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>> kirpichov@google.com> wrote:
>>>>>>>
>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>> state)
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Could we alternatively use a state mapping function to keep track
>>>>>>>>> of the computation so far instead of outputting V each time? (also the
>>>>>>>>> progress so far is probably of a different type R rather than V).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> So we had a quick chat about what it would take to add something
>>>>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about this
>>>>>>>>>> last year but didn't get very far.
>>>>>>>>>>
>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>> For input type T
>>>>>>>>>> Output type V
>>>>>>>>>>
>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>>>> will be
>>>>>>>>>>
>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>
>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>> and then union the results
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This is maybe not a great design but it was minimally complicated
>>>>>>>>>> and I figured terrible was a good place to start and improve from.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>> worse than I remember because its been awhile since I thought about this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
> --
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <ho...@pigscanfly.ca> wrote:

> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Reviving this thread. I think SDF is a pretty big risk for Spark runner
>>>> streaming. Holden, is it correct that Spark appears to have no way at all
>>>> to produce an infinite DStream from a finite RDD? Maybe we can somehow
>>>> dynamically create a new DStream for every initial restriction, said
>>>> DStream being obtained using a Receiver that under the hood actually runs
>>>> the SDF? (this is of course less efficient than a timer-capable runner
>>>> would do, and I have doubts about the fault tolerance)
>>>>
>>> So on the streaming side we could simply do it with a fixed number of
>>> levels on DStreams. It’s not great but it would work.
>>>
>> Not sure I understand this. Let me try to clarify what SDF demands of the
>> runner. Imagine the following case: a file contains a list of "master"
>> Kafka topics, on which there are published additional Kafka topics to read.
>>
>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
>> PCollection<String> nestedTopics =
>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>> PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>
>> This exemplifies both use cases of a streaming SDF that emits infinite
>> output for every input:
>> - Applying it to a finite set of inputs (in this case to the result of
>> reading a text file)
>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>> number of streams being read concurrently, each of the streams themselves
>> is unbounded too)
>>
>> Does the multi-level solution you have in mind work for this case? I
>> suppose the second case is harder, so we can focus on that.
>>
> So none of those are a splittabledofn right?
>
Not sure what you mean? ReadFromKafkaFn in these examples is a splittable
DoFn and we're trying to figure out how to make Spark run it.


>
> Assuming that we have a given dstream though in Spark we can get the
> underlying RDD implementation for each microbatch and do our work inside of
> that.
>
>>
>>
>>>
>>> More generally this does raise an important question if we want to
>>> target datasets instead of rdds/DStreams in which case i would need to do
>>> some more poking.
>>>
>>>
>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> How would timers be implemented? By outputing and reprocessing, the
>>>>> same way you proposed for SDF?
>>>>>
>>>> i mean the timers could be inside the mappers within the system. Could
>>> use a singleton so if a partition is re-executed it doesn’t end up as a
>>> straggler.
>>>
>>>>
>>>>>
>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> So the timers would have to be in our own code.
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> Does Spark have support for timers? (I know it has support for state)
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Could we alternatively use a state mapping function to keep track
>>>>>>>> of the computation so far instead of outputting V each time? (also the
>>>>>>>> progress so far is probably of a different type R rather than V).
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> So we had a quick chat about what it would take to add something
>>>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about this
>>>>>>>>> last year but didn't get very far.
>>>>>>>>>
>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>> For input type T
>>>>>>>>> Output type V
>>>>>>>>>
>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>>> will be
>>>>>>>>>
>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>> listen for a signal on a port
>>>>>>>>>
>>>>>>>>> Once we're done running we take the result and filter for the ones
>>>>>>>>> with T and V into seperate collections re-run until finished
>>>>>>>>> and then union the results
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is maybe not a great design but it was minimally complicated
>>>>>>>>> and I figured terrible was a good place to start and improve from.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>> worse than I remember because its been awhile since I thought about this.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <ki...@google.com>
wrote:

> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca> wrote:
>
>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Reviving this thread. I think SDF is a pretty big risk for Spark runner
>>> streaming. Holden, is it correct that Spark appears to have no way at all
>>> to produce an infinite DStream from a finite RDD? Maybe we can somehow
>>> dynamically create a new DStream for every initial restriction, said
>>> DStream being obtained using a Receiver that under the hood actually runs
>>> the SDF? (this is of course less efficient than a timer-capable runner
>>> would do, and I have doubts about the fault tolerance)
>>>
>> So on the streaming side we could simply do it with a fixed number of
>> levels on DStreams. It’s not great but it would work.
>>
> Not sure I understand this. Let me try to clarify what SDF demands of the
> runner. Imagine the following case: a file contains a list of "master"
> Kafka topics, on which there are published additional Kafka topics to read.
>
> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
> PCollection<String> nestedTopics =
> masterTopics.apply(ParDo(ReadFromKafkaFn))
> PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn))
>
> This exemplifies both use cases of a streaming SDF that emits infinite
> output for every input:
> - Applying it to a finite set of inputs (in this case to the result of
> reading a text file)
> - Applying it to an infinite set of inputs (i.e. having an unbounded
> number of streams being read concurrently, each of the streams themselves
> is unbounded too)
>
> Does the multi-level solution you have in mind work for this case? I
> suppose the second case is harder, so we can focus on that.
>
So none of those are a splittabledofn right?

Assuming that we have a given dstream though in Spark we can get the
underlying RDD implementation for each microbatch and do our work inside of
that.

>
>
>>
>> More generally this does raise an important question if we want to target
>> datasets instead of rdds/DStreams in which case i would need to do some
>> more poking.
>>
>>
>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> How would timers be implemented? By outputing and reprocessing, the
>>>> same way you proposed for SDF?
>>>>
>>> i mean the timers could be inside the mappers within the system. Could
>> use a singleton so if a partition is re-executed it doesn’t end up as a
>> straggler.
>>
>>>
>>>>
>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> So the timers would have to be in our own code.
>>>>>
>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Does Spark have support for timers? (I know it has support for state)
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Could we alternatively use a state mapping function to keep track of
>>>>>>> the computation so far instead of outputting V each time? (also the
>>>>>>> progress so far is probably of a different type R rather than V).
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> So we had a quick chat about what it would take to add something
>>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about this
>>>>>>>> last year but didn't get very far.
>>>>>>>>
>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>> For input type T
>>>>>>>> Output type V
>>>>>>>>
>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>> will be
>>>>>>>>
>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>> listen for a signal on a port
>>>>>>>>
>>>>>>>> Once we're done running we take the result and filter for the ones
>>>>>>>> with T and V into seperate collections re-run until finished
>>>>>>>> and then union the results
>>>>>>>>
>>>>>>>>
>>>>>>>> This is maybe not a great design but it was minimally complicated
>>>>>>>> and I figured terrible was a good place to start and improve from.
>>>>>>>>
>>>>>>>>
>>>>>>>> Let me know your thoughts, especially the parts where this is worse
>>>>>>>> than I remember because its been awhile since I thought about this.
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
> --
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <ho...@pigscanfly.ca> wrote:

> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Reviving this thread. I think SDF is a pretty big risk for Spark runner
>> streaming. Holden, is it correct that Spark appears to have no way at all
>> to produce an infinite DStream from a finite RDD? Maybe we can somehow
>> dynamically create a new DStream for every initial restriction, said
>> DStream being obtained using a Receiver that under the hood actually runs
>> the SDF? (this is of course less efficient than a timer-capable runner
>> would do, and I have doubts about the fault tolerance)
>>
> So on the streaming side we could simply do it with a fixed number of
> levels on DStreams. It’s not great but it would work.
>
Not sure I understand this. Let me try to clarify what SDF demands of the
runner. Imagine the following case: a file contains a list of "master"
Kafka topics, on which there are published additional Kafka topics to read.

PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
PCollection<String> nestedTopics =
masterTopics.apply(ParDo(ReadFromKafkaFn))
PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn))

This exemplifies both use cases of a streaming SDF that emits infinite
output for every input:
- Applying it to a finite set of inputs (in this case to the result of
reading a text file)
- Applying it to an infinite set of inputs (i.e. having an unbounded number
of streams being read concurrently, each of the streams themselves is
unbounded too)

Does the multi-level solution you have in mind work for this case? I
suppose the second case is harder, so we can focus on that.


>
> More generally this does raise an important question if we want to target
> datasets instead of rdds/DStreams in which case i would need to do some
> more poking.
>
>
>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>>
>>> How would timers be implemented? By outputing and reprocessing, the same
>>> way you proposed for SDF?
>>>
>> i mean the timers could be inside the mappers within the system. Could
> use a singleton so if a partition is re-executed it doesn’t end up as a
> straggler.
>
>>
>>>
>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> So the timers would have to be in our own code.
>>>>
>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Does Spark have support for timers? (I know it has support for state)
>>>>>
>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Could we alternatively use a state mapping function to keep track of
>>>>>> the computation so far instead of outputting V each time? (also the
>>>>>> progress so far is probably of a different type R rather than V).
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> So we had a quick chat about what it would take to add something
>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about this
>>>>>>> last year but didn't get very far.
>>>>>>>
>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>> For input type T
>>>>>>> Output type V
>>>>>>>
>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>> and if the computation finishes T will be populated otherwise V will
>>>>>>> be
>>>>>>>
>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>> listen for a signal on a port
>>>>>>>
>>>>>>> Once we're done running we take the result and filter for the ones
>>>>>>> with T and V into seperate collections re-run until finished
>>>>>>> and then union the results
>>>>>>>
>>>>>>>
>>>>>>> This is maybe not a great design but it was minimally complicated
>>>>>>> and I figured terrible was a good place to start and improve from.
>>>>>>>
>>>>>>>
>>>>>>> Let me know your thoughts, especially the parts where this is worse
>>>>>>> than I remember because its been awhile since I thought about this.
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Reviving this thread. I think SDF is a pretty big risk for Spark runner
> streaming. Holden, is it correct that Spark appears to have no way at all
> to produce an infinite DStream from a finite RDD? Maybe we can somehow
> dynamically create a new DStream for every initial restriction, said
> DStream being obtained using a Receiver that under the hood actually runs
> the SDF? (this is of course less efficient than a timer-capable runner
> would do, and I have doubts about the fault tolerance)
>
So on the streaming side we could simply do it with a fixed number of
levels on DStreams. It’s not great but it would work.

More generally this does raise an important question if we want to target
datasets instead of rdds/DStreams in which case i would need to do some
more poking.


> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>
>> How would timers be implemented? By outputing and reprocessing, the same
>> way you proposed for SDF?
>>
> i mean the timers could be inside the mappers within the system. Could use
a singleton so if a partition is re-executed it doesn’t end up as a
straggler.

>
>>
>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> So the timers would have to be in our own code.
>>>
>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Does Spark have support for timers? (I know it has support for state)
>>>>
>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Could we alternatively use a state mapping function to keep track of
>>>>> the computation so far instead of outputting V each time? (also the
>>>>> progress so far is probably of a different type R rather than V).
>>>>>
>>>>>
>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> So we had a quick chat about what it would take to add something like
>>>>>> SplittableDoFns to Spark. I'd done some sketchy thinking about this last
>>>>>> year but didn't get very far.
>>>>>>
>>>>>> My back-of-the-envelope design was as follows:
>>>>>> For input type T
>>>>>> Output type V
>>>>>>
>>>>>> Implement a mapper which outputs type (T, V)
>>>>>> and if the computation finishes T will be populated otherwise V will
>>>>>> be
>>>>>>
>>>>>> For determining how long to run we'd up to either K seconds or listen
>>>>>> for a signal on a port
>>>>>>
>>>>>> Once we're done running we take the result and filter for the ones
>>>>>> with T and V into seperate collections re-run until finished
>>>>>> and then union the results
>>>>>>
>>>>>>
>>>>>> This is maybe not a great design but it was minimally complicated and
>>>>>> I figured terrible was a good place to start and improve from.
>>>>>>
>>>>>>
>>>>>> Let me know your thoughts, especially the parts where this is worse
>>>>>> than I remember because its been awhile since I thought about this.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>> --
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
Reviving this thread. I think SDF is a pretty big risk for Spark runner
streaming. Holden, is it correct that Spark appears to have no way at all
to produce an infinite DStream from a finite RDD? Maybe we can somehow
dynamically create a new DStream for every initial restriction, said
DStream being obtained using a Receiver that under the hood actually runs
the SDF? (this is of course less efficient than a timer-capable runner
would do, and I have doubts about the fault tolerance)

On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:

> How would timers be implemented? By outputing and reprocessing, the same
> way you proposed for SDF?
>
>
> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca> wrote:
>
>> So the timers would have to be in our own code.
>>
>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Does Spark have support for timers? (I know it has support for state)
>>>
>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Could we alternatively use a state mapping function to keep track of
>>>> the computation so far instead of outputting V each time? (also the
>>>> progress so far is probably of a different type R rather than V).
>>>>
>>>>
>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> So we had a quick chat about what it would take to add something like
>>>>> SplittableDoFns to Spark. I'd done some sketchy thinking about this last
>>>>> year but didn't get very far.
>>>>>
>>>>> My back-of-the-envelope design was as follows:
>>>>> For input type T
>>>>> Output type V
>>>>>
>>>>> Implement a mapper which outputs type (T, V)
>>>>> and if the computation finishes T will be populated otherwise V will be
>>>>>
>>>>> For determining how long to run we'd up to either K seconds or listen
>>>>> for a signal on a port
>>>>>
>>>>> Once we're done running we take the result and filter for the ones
>>>>> with T and V into seperate collections re-run until finished
>>>>> and then union the results
>>>>>
>>>>>
>>>>> This is maybe not a great design but it was minimally complicated and
>>>>> I figured terrible was a good place to start and improve from.
>>>>>
>>>>>
>>>>> Let me know your thoughts, especially the parts where this is worse
>>>>> than I remember because its been awhile since I thought about this.
>>>>>
>>>>>
>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>

Re: Splittable DoFN in Spark discussion

Posted by Reuven Lax <re...@google.com>.
How would timers be implemented? By outputing and reprocessing, the same
way you proposed for SDF?


On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <ho...@pigscanfly.ca> wrote:

> So the timers would have to be in our own code.
>
> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Does Spark have support for timers? (I know it has support for state)
>>
>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Could we alternatively use a state mapping function to keep track of the
>>> computation so far instead of outputting V each time? (also the progress so
>>> far is probably of a different type R rather than V).
>>>
>>>
>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>>> wrote:
>>>
>>>> So we had a quick chat about what it would take to add something like
>>>> SplittableDoFns to Spark. I'd done some sketchy thinking about this last
>>>> year but didn't get very far.
>>>>
>>>> My back-of-the-envelope design was as follows:
>>>> For input type T
>>>> Output type V
>>>>
>>>> Implement a mapper which outputs type (T, V)
>>>> and if the computation finishes T will be populated otherwise V will be
>>>>
>>>> For determining how long to run we'd up to either K seconds or listen
>>>> for a signal on a port
>>>>
>>>> Once we're done running we take the result and filter for the ones with
>>>> T and V into seperate collections re-run until finished
>>>> and then union the results
>>>>
>>>>
>>>> This is maybe not a great design but it was minimally complicated and I
>>>> figured terrible was a good place to start and improve from.
>>>>
>>>>
>>>> Let me know your thoughts, especially the parts where this is worse
>>>> than I remember because its been awhile since I thought about this.
>>>>
>>>>
>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
> Twitter: https://twitter.com/holdenkarau
>

Re: Splittable DoFN in Spark discussion

Posted by Holden Karau <ho...@pigscanfly.ca>.
So the timers would have to be in our own code.

On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Does Spark have support for timers? (I know it has support for state)
>
> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:
>
>> Could we alternatively use a state mapping function to keep track of the
>> computation so far instead of outputting V each time? (also the progress so
>> far is probably of a different type R rather than V).
>>
>>
>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca>
>> wrote:
>>
>>> So we had a quick chat about what it would take to add something like
>>> SplittableDoFns to Spark. I'd done some sketchy thinking about this last
>>> year but didn't get very far.
>>>
>>> My back-of-the-envelope design was as follows:
>>> For input type T
>>> Output type V
>>>
>>> Implement a mapper which outputs type (T, V)
>>> and if the computation finishes T will be populated otherwise V will be
>>>
>>> For determining how long to run we'd up to either K seconds or listen
>>> for a signal on a port
>>>
>>> Once we're done running we take the result and filter for the ones with
>>> T and V into seperate collections re-run until finished
>>> and then union the results
>>>
>>>
>>> This is maybe not a great design but it was minimally complicated and I
>>> figured terrible was a good place to start and improve from.
>>>
>>>
>>> Let me know your thoughts, especially the parts where this is worse than
>>> I remember because its been awhile since I thought about this.
>>>
>>>
>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>> --
Twitter: https://twitter.com/holdenkarau

Re: Splittable DoFN in Spark discussion

Posted by Eugene Kirpichov <ki...@google.com>.
Does Spark have support for timers? (I know it has support for state)

On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> wrote:

> Could we alternatively use a state mapping function to keep track of the
> computation so far instead of outputting V each time? (also the progress so
> far is probably of a different type R rather than V).
>
>
> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca> wrote:
>
>> So we had a quick chat about what it would take to add something like
>> SplittableDoFns to Spark. I'd done some sketchy thinking about this last
>> year but didn't get very far.
>>
>> My back-of-the-envelope design was as follows:
>> For input type T
>> Output type V
>>
>> Implement a mapper which outputs type (T, V)
>> and if the computation finishes T will be populated otherwise V will be
>>
>> For determining how long to run we'd up to either K seconds or listen for
>> a signal on a port
>>
>> Once we're done running we take the result and filter for the ones with T
>> and V into seperate collections re-run until finished
>> and then union the results
>>
>>
>> This is maybe not a great design but it was minimally complicated and I
>> figured terrible was a good place to start and improve from.
>>
>>
>> Let me know your thoughts, especially the parts where this is worse than
>> I remember because its been awhile since I thought about this.
>>
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>

Re: Splittable DoFN in Spark discussion

Posted by Reuven Lax <re...@google.com>.
Could we alternatively use a state mapping function to keep track of the
computation so far instead of outputting V each time? (also the progress so
far is probably of a different type R rather than V).


On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <ho...@pigscanfly.ca> wrote:

> So we had a quick chat about what it would take to add something like
> SplittableDoFns to Spark. I'd done some sketchy thinking about this last
> year but didn't get very far.
>
> My back-of-the-envelope design was as follows:
> For input type T
> Output type V
>
> Implement a mapper which outputs type (T, V)
> and if the computation finishes T will be populated otherwise V will be
>
> For determining how long to run we'd up to either K seconds or listen for
> a signal on a port
>
> Once we're done running we take the result and filter for the ones with T
> and V into seperate collections re-run until finished
> and then union the results
>
>
> This is maybe not a great design but it was minimally complicated and I
> figured terrible was a good place to start and improve from.
>
>
> Let me know your thoughts, especially the parts where this is worse than I
> remember because its been awhile since I thought about this.
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>