You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Amit Sela <am...@gmail.com> on 2016/10/07 23:55:48 UTC

[DISCUSS] UnboundedSource and the KafkaIO.

I started a thread about (suggesting) UnboundedSource splitId's and it
turned into an UnboundedSource/KafkaIO discussion, and I think it's best to
start over in a clear [DISCUSS] thread.

When working on UnboundedSource support for the Spark runner, I've raised
some questions, some were general-UnboundedSource, and others
Kafka-specific.

I'd like to recap them here, and maybe have a more productive and
well-documented discussion for everyone.

   1. UnboundedSource id's - I assume any runner persists the
   UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how it
   matches the appropriate split (of the UnboundedSource) to it's previously
   persisted CheckpointMark in any specific worker ?
   *Thomas Groh* mentioned that Source splits have to have an
associated identifier,
   and so the runner gets to tag splits however it pleases, so long as
   those tags don't allow splits to bleed into each other.
   2. Consistent splitting - an UnboundedSource splitting seems to require
   consistent splitting if it were to "pick-up where it left", correct ? this
   is not mentioned as a requirement or a recommendation in
   UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue ?
   *Raghu Angadi* mentioned that Kafka already does so by applying
   partitions to readers in a round-robin manner.
   *Thomas Groh* also added that while the UnboundedSource API doesn't
   require deterministic splitting (although it's recommended), a
   PipelineRunner
   should keep track of the initially generated splits.
   3. Support reading of Kafka partitions that were added to topic/s while
   a Pipeline reads from them - BEAM-727
   <https://issues.apache.org/jira/browse/BEAM-727> was filed.
   4. Reading/persisting Kafka start offsets - since Spark works in
   micro-batches, if "latest" was applied on a fairly sparse topic each worker
   would actually begin reading only after it saw a message during the time
   window it had to read messages. This is because fetching the offsets is
   done by the worker running the Reader. This means that each Reader sees a
   different state of "latest" (for his partition/s), such that a failing
   Reader that hasn't read yet might fetch a different "latest" once it's
   recovered then what it originally fetched. While this may not be as painful
   for other runners, IMHO it lacks correctness and I'd suggest either reading
   Kafka metadata of the Kafka cluster once upon initial splitting, or add
   some of it to the CheckpointMark. Filed BEAM-704
   <https://issues.apache.org/jira/browse/BEAM-704>.

The original thread is called "Should UnboundedSource provide a split
identifier ?".

While the only specific implementation of UnboundedSource discussed here is
Kafka, it is probably the most popular open-source UnboundedSource. Having
said that, I wonder where this meets PubSub ? or any other UnboundedSource
that those questions might affect.

Thanks,
Amit

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Amit Sela <am...@gmail.com>.
That's a good question Robert, and I did.

First of all, an UnboundedSource is split into splits that implement a sort
of "BoundedReadFromUnboundedSource", with Restrictions on time and
(optional) number of records - this seems to fit nicely into the *SDF*
 language.

Taking a look at the diagram in the Spark runner's UnboundedSource design
doc, and the diagram placed just above "Restrictions, blocks and positions
<https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#heading=h.vjs7pzbb7kw>"
there seems to be a lot of resemblance.

Finally, the idea of reading from within a DoFn is exactly what I'm doing
here, reading from within the mapWithState's "mappingFunction" - a function
that maps:
 <K, Option<V>, State> *=>* Iterator<ReadElementsT>
Mapping a Source (and it's State and possibly restrictions) into a bunch of
read records.

This all *seems* to be a good fit, but I'll probably have to keep following
closely to see how the API is forming.

Thanks,
Amit

On Mon, Oct 10, 2016 at 8:23 PM Robert Bradshaw <ro...@google.com.invalid>
wrote:

> Just looking to the future, have you given any thought on how well
> this would work on https://s.apache.org/splittable-do-fn?
>
> On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela <am...@gmail.com> wrote:
> > Thanks Max!
> >
> > I'll try to explain Spark's stateful operators and how/why I used them
> with
> > UnboundedSource.
> >
> > Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> > Since updateStateByKey is bound to output the (updated) state itself -
> the
> > CheckpointMark in our case - we're left with mapWithState.
> > mapWithState provides a persistent, distributed "map-like", that is
> > partitioned according to the stream. This is indeed how I manage state
> > between micro-batches.
> > However, mapWithState (like any map) will give you a value (state)
> > corresponding to a specific key, so I use a running-id from the initial
> > splitting to identify the appropriate state.
> > I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> > could do the same and save the split source with the CheckpointMark but
> > it'll still have to correspond to the same id, and since I had to wrap
> the
> > split Source to perform a sort of "BoundedReadFromUnboundedSource" I
> simply
> > added an id field and I'm hashing by that id.
> > I'll also add that the stateful operator can only be applied to a
> > (Pair)Stream and not to input operators so I'm actually generating a
> stream
> > of splits (the same ones for every micro-batch) and reading from within
> the
> > mappingFunction of the mapWithState.
> >
> > It's not the simplest design, but given how Spark's persistent state and
> > InputDStream are designed comparing to the Beam model, I don't see
> another
> > way - though I'd be happy to hear one!
> >
> > Pretty sure I've added this here but no harm in adding the link again:
> design
> > doc
> > <
> https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing
> >
> > and
> > a work-in-progress branch
> > <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
> > mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658>
> as
> > well.
> > The design doc also relates to how "pure" Spark works with Kafka, which I
> > think is interesting and very different from Flink/Dataflow.
> >
> > Hope this helped clear things up a little, please keep on asking if
> > something is not clear yet.
> >
> > Thanks,
> > Amit.
> >
> > On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <mx...@apache.org>
> wrote:
> >
> >> Just to add a comment from the Flink side and its
> >>
> >> UnboundedSourceWrapper. We experienced the only way to guarantee
> >>
> >> deterministic splitting of the source, was to generate the splits upon
> >>
> >> creation of the source and then checkpoint the assignment during
> >>
> >> runtime. When restoring from a checkpoint, the same reader
> >>
> >> configuration is restored. It's not possible to change the splitting
> >>
> >> after the initial splitting has taken place. However, Flink will soon
> >>
> >> be able to repartition the operator state upon restart/rescaling of a
> >>
> >> job.
> >>
> >>
> >>
> >> Does Spark have a way to pass state of a previous mini batch to the
> >>
> >> current mini batch? If so, you could restore the last configuration
> >>
> >> and continue reading from the checkpointed offset. You just have to
> >>
> >> checkpoint before the mini batch ends.
> >>
> >>
> >>
> >> -Max
> >>
> >>
> >>
> >> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <jb@nanthrax.net
> >
> >> wrote:
> >>
> >> > Hi Amit,
> >>
> >> >
> >>
> >> > thanks for the explanation.
> >>
> >> >
> >>
> >> > For 4, you are right, it's slightly different from DataXchange
> (related
> >> to
> >>
> >> > the elements in the PCollection). I think storing the "starting point"
> >> for a
> >>
> >> > reader makes sense.
> >>
> >> >
> >>
> >> > Regards
> >>
> >> > JB
> >>
> >> >
> >>
> >> >
> >>
> >> > On 10/10/2016 10:33 AM, Amit Sela wrote:
> >>
> >> >>
> >>
> >> >> Inline, thanks JB!
> >>
> >> >>
> >>
> >> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <
> jb@nanthrax.net>
> >>
> >> >> wrote:
> >>
> >> >>
> >>
> >> >>> Hi Amit,
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> For 1., the runner is responsible of the checkpoint storage
> (associated
> >>
> >> >>>
> >>
> >> >>> with the source). It's the way for the runner to retry and know the
> >>
> >> >>>
> >>
> >> >>> failed bundles.
> >>
> >> >>>
> >>
> >> >> True, this was a recap/summary of another, not-so-clear, thread.
> >>
> >> >>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> For 4, are you proposing that KafkaRecord store additional metadata
> for
> >>
> >> >>>
> >>
> >> >>> that ? It sounds like what I proposed in the "Technical Vision"
> >> appendix
> >>
> >> >>>
> >>
> >> >>> document: there I proposed to introduce a DataXchange object that
> store
> >>
> >> >>>
> >>
> >> >>> some additional metadata (like offset) used by the runner. It would
> be
> >>
> >> >>>
> >>
> >> >>> the same with SDF as the tracker state should be persistent as well.
> >>
> >> >>>
> >>
> >> >> I think I was more focused on persisting the "starting point" for a
> >>
> >> >> reader,
> >>
> >> >> even if no records were read (yet), so that the next time the reader
> >>
> >> >> attempts to read it will pick of there. This has more to do with how
> the
> >>
> >> >> CheckpointMark handles this.
> >>
> >> >> I have to say that I'm not familiar with your DataXchange proposal, I
> >> will
> >>
> >> >> take a look though.
> >>
> >> >>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> Regards
> >>
> >> >>>
> >>
> >> >>> JB
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
> >>
> >> >>>
> >>
> >> >>>> I started a thread about (suggesting) UnboundedSource splitId's
> and it
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
> >> best
> >>
> >> >>>
> >>
> >> >>> to
> >>
> >> >>>
> >>
> >> >>>> start over in a clear [DISCUSS] thread.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> When working on UnboundedSource support for the Spark runner, I've
> >>
> >> >>>> raised
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> some questions, some were general-UnboundedSource, and others
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> Kafka-specific.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> I'd like to recap them here, and maybe have a more productive and
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> well-documented discussion for everyone.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>>    1. UnboundedSource id's - I assume any runner persists the
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I
> wonder
> >>
> >> >>>
> >>
> >> >>> how it
> >>
> >> >>>
> >>
> >> >>>>    matches the appropriate split (of the UnboundedSource) to it's
> >>
> >> >>>
> >>
> >> >>> previously
> >>
> >> >>>
> >>
> >> >>>>    persisted CheckpointMark in any specific worker ?
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    *Thomas Groh* mentioned that Source splits have to have an
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> associated identifier,
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    and so the runner gets to tag splits however it pleases, so
> long as
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    those tags don't allow splits to bleed into each other.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
> >>
> >> >>>
> >>
> >> >>> require
> >>
> >> >>>
> >>
> >> >>>>    consistent splitting if it were to "pick-up where it left",
> >> correct ?
> >>
> >> >>>
> >>
> >> >>> this
> >>
> >> >>>
> >>
> >> >>>>    is not mentioned as a requirement or a recommendation in
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
> >>
> >> >>>
> >>
> >> >>> issue ?
> >>
> >> >>>
> >>
> >> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    partitions to readers in a round-robin manner.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    *Thomas Groh* also added that while the UnboundedSource API
> doesn't
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    require deterministic splitting (although it's recommended), a
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    PipelineRunner
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    should keep track of the initially generated splits.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    3. Support reading of Kafka partitions that were added to
> topic/s
> >>
> >> >>>
> >>
> >> >>> while
> >>
> >> >>>
> >>
> >> >>>>    a Pipeline reads from them - BEAM-727
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
> >> each
> >>
> >> >>>
> >>
> >> >>> worker
> >>
> >> >>>
> >>
> >> >>>>    would actually begin reading only after it saw a message during
> the
> >>
> >> >>>
> >>
> >> >>> time
> >>
> >> >>>
> >>
> >> >>>>    window it had to read messages. This is because fetching the
> >> offsets
> >>
> >> >>>
> >>
> >> >>> is
> >>
> >> >>>
> >>
> >> >>>>    done by the worker running the Reader. This means that each
> Reader
> >>
> >> >>>
> >>
> >> >>> sees a
> >>
> >> >>>
> >>
> >> >>>>    different state of "latest" (for his partition/s), such that a
> >>
> >> >>>> failing
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    Reader that hasn't read yet might fetch a different "latest"
> once
> >>
> >> >>>> it's
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    recovered then what it originally fetched. While this may not
> be as
> >>
> >> >>>
> >>
> >> >>> painful
> >>
> >> >>>
> >>
> >> >>>>    for other runners, IMHO it lacks correctness and I'd suggest
> either
> >>
> >> >>>
> >>
> >> >>> reading
> >>
> >> >>>
> >>
> >> >>>>    Kafka metadata of the Kafka cluster once upon initial
> splitting, or
> >>
> >> >>>
> >>
> >> >>> add
> >>
> >> >>>
> >>
> >> >>>>    some of it to the CheckpointMark. Filed BEAM-704
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> The original thread is called "Should UnboundedSource provide a
> split
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> identifier ?".
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> While the only specific implementation of UnboundedSource discussed
> >> here
> >>
> >> >>>
> >>
> >> >>> is
> >>
> >> >>>
> >>
> >> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
> >>
> >> >>>
> >>
> >> >>> Having
> >>
> >> >>>
> >>
> >> >>>> said that, I wonder where this meets PubSub ? or any other
> >>
> >> >>>
> >>
> >> >>> UnboundedSource
> >>
> >> >>>
> >>
> >> >>>> that those questions might affect.
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>> Thanks,
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>> Amit
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>> --
> >>
> >> >>>
> >>
> >> >>> Jean-Baptiste Onofré
> >>
> >> >>>
> >>
> >> >>> jbonofre@apache.org
> >>
> >> >>>
> >>
> >> >>> http://blog.nanthrax.net
> >>
> >> >>>
> >>
> >> >>> Talend - http://www.talend.com
> >>
> >> >>>
> >>
> >> >>>
> >>
> >> >>
> >>
> >> >
> >>
> >> > --
> >>
> >> > Jean-Baptiste Onofré
> >>
> >> > jbonofre@apache.org
> >>
> >> > http://blog.nanthrax.net
> >>
> >> > Talend - http://www.talend.com
> >>
> >>
>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Robert Bradshaw <ro...@google.com.INVALID>.
Just looking to the future, have you given any thought on how well
this would work on https://s.apache.org/splittable-do-fn?

On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela <am...@gmail.com> wrote:
> Thanks Max!
>
> I'll try to explain Spark's stateful operators and how/why I used them with
> UnboundedSource.
>
> Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> Since updateStateByKey is bound to output the (updated) state itself - the
> CheckpointMark in our case - we're left with mapWithState.
> mapWithState provides a persistent, distributed "map-like", that is
> partitioned according to the stream. This is indeed how I manage state
> between micro-batches.
> However, mapWithState (like any map) will give you a value (state)
> corresponding to a specific key, so I use a running-id from the initial
> splitting to identify the appropriate state.
> I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> could do the same and save the split source with the CheckpointMark but
> it'll still have to correspond to the same id, and since I had to wrap the
> split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
> added an id field and I'm hashing by that id.
> I'll also add that the stateful operator can only be applied to a
> (Pair)Stream and not to input operators so I'm actually generating a stream
> of splits (the same ones for every micro-batch) and reading from within the
> mappingFunction of the mapWithState.
>
> It's not the simplest design, but given how Spark's persistent state and
> InputDStream are designed comparing to the Beam model, I don't see another
> way - though I'd be happy to hear one!
>
> Pretty sure I've added this here but no harm in adding the link again: design
> doc
> <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
> and
> a work-in-progress branch
> <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
> mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as
> well.
> The design doc also relates to how "pure" Spark works with Kafka, which I
> think is interesting and very different from Flink/Dataflow.
>
> Hope this helped clear things up a little, please keep on asking if
> something is not clear yet.
>
> Thanks,
> Amit.
>
> On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <mx...@apache.org> wrote:
>
>> Just to add a comment from the Flink side and its
>>
>> UnboundedSourceWrapper. We experienced the only way to guarantee
>>
>> deterministic splitting of the source, was to generate the splits upon
>>
>> creation of the source and then checkpoint the assignment during
>>
>> runtime. When restoring from a checkpoint, the same reader
>>
>> configuration is restored. It's not possible to change the splitting
>>
>> after the initial splitting has taken place. However, Flink will soon
>>
>> be able to repartition the operator state upon restart/rescaling of a
>>
>> job.
>>
>>
>>
>> Does Spark have a way to pass state of a previous mini batch to the
>>
>> current mini batch? If so, you could restore the last configuration
>>
>> and continue reading from the checkpointed offset. You just have to
>>
>> checkpoint before the mini batch ends.
>>
>>
>>
>> -Max
>>
>>
>>
>> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> > Hi Amit,
>>
>> >
>>
>> > thanks for the explanation.
>>
>> >
>>
>> > For 4, you are right, it's slightly different from DataXchange (related
>> to
>>
>> > the elements in the PCollection). I think storing the "starting point"
>> for a
>>
>> > reader makes sense.
>>
>> >
>>
>> > Regards
>>
>> > JB
>>
>> >
>>
>> >
>>
>> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> >>
>>
>> >> Inline, thanks JB!
>>
>> >>
>>
>> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>>
>> >> wrote:
>>
>> >>
>>
>> >>> Hi Amit,
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 1., the runner is responsible of the checkpoint storage (associated
>>
>> >>>
>>
>> >>> with the source). It's the way for the runner to retry and know the
>>
>> >>>
>>
>> >>> failed bundles.
>>
>> >>>
>>
>> >> True, this was a recap/summary of another, not-so-clear, thread.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>>
>> >>>
>>
>> >>> that ? It sounds like what I proposed in the "Technical Vision"
>> appendix
>>
>> >>>
>>
>> >>> document: there I proposed to introduce a DataXchange object that store
>>
>> >>>
>>
>> >>> some additional metadata (like offset) used by the runner. It would be
>>
>> >>>
>>
>> >>> the same with SDF as the tracker state should be persistent as well.
>>
>> >>>
>>
>> >> I think I was more focused on persisting the "starting point" for a
>>
>> >> reader,
>>
>> >> even if no records were read (yet), so that the next time the reader
>>
>> >> attempts to read it will pick of there. This has more to do with how the
>>
>> >> CheckpointMark handles this.
>>
>> >> I have to say that I'm not familiar with your DataXchange proposal, I
>> will
>>
>> >> take a look though.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> Regards
>>
>> >>>
>>
>> >>> JB
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>
>> >>>
>>
>> >>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>
>> >>>
>>
>> >>>
>>
>> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
>> best
>>
>> >>>
>>
>> >>> to
>>
>> >>>
>>
>> >>>> start over in a clear [DISCUSS] thread.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> When working on UnboundedSource support for the Spark runner, I've
>>
>> >>>> raised
>>
>> >>>
>>
>> >>>
>>
>> >>>> some questions, some were general-UnboundedSource, and others
>>
>> >>>
>>
>> >>>
>>
>> >>>> Kafka-specific.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> I'd like to recap them here, and maybe have a more productive and
>>
>> >>>
>>
>> >>>
>>
>> >>>> well-documented discussion for everyone.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>>    1. UnboundedSource id's - I assume any runner persists the
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>
>> >>>
>>
>> >>> how it
>>
>> >>>
>>
>> >>>>    matches the appropriate split (of the UnboundedSource) to it's
>>
>> >>>
>>
>> >>> previously
>>
>> >>>
>>
>> >>>>    persisted CheckpointMark in any specific worker ?
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* mentioned that Source splits have to have an
>>
>> >>>
>>
>> >>>
>>
>> >>>> associated identifier,
>>
>> >>>
>>
>> >>>
>>
>> >>>>    and so the runner gets to tag splits however it pleases, so long as
>>
>> >>>
>>
>> >>>
>>
>> >>>>    those tags don't allow splits to bleed into each other.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>>
>> >>>
>>
>> >>> require
>>
>> >>>
>>
>> >>>>    consistent splitting if it were to "pick-up where it left",
>> correct ?
>>
>> >>>
>>
>> >>> this
>>
>> >>>
>>
>> >>>>    is not mentioned as a requirement or a recommendation in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>
>> >>>
>>
>> >>> issue ?
>>
>> >>>
>>
>> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>
>> >>>
>>
>> >>>
>>
>> >>>>    partitions to readers in a round-robin manner.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>
>> >>>
>>
>> >>>
>>
>> >>>>    require deterministic splitting (although it's recommended), a
>>
>> >>>
>>
>> >>>
>>
>> >>>>    PipelineRunner
>>
>> >>>
>>
>> >>>
>>
>> >>>>    should keep track of the initially generated splits.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    3. Support reading of Kafka partitions that were added to topic/s
>>
>> >>>
>>
>> >>> while
>>
>> >>>
>>
>> >>>>    a Pipeline reads from them - BEAM-727
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
>> each
>>
>> >>>
>>
>> >>> worker
>>
>> >>>
>>
>> >>>>    would actually begin reading only after it saw a message during the
>>
>> >>>
>>
>> >>> time
>>
>> >>>
>>
>> >>>>    window it had to read messages. This is because fetching the
>> offsets
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>>    done by the worker running the Reader. This means that each Reader
>>
>> >>>
>>
>> >>> sees a
>>
>> >>>
>>
>> >>>>    different state of "latest" (for his partition/s), such that a
>>
>> >>>> failing
>>
>> >>>
>>
>> >>>
>>
>> >>>>    Reader that hasn't read yet might fetch a different "latest" once
>>
>> >>>> it's
>>
>> >>>
>>
>> >>>
>>
>> >>>>    recovered then what it originally fetched. While this may not be as
>>
>> >>>
>>
>> >>> painful
>>
>> >>>
>>
>> >>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>>
>> >>>
>>
>> >>> reading
>>
>> >>>
>>
>> >>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>>
>> >>>
>>
>> >>> add
>>
>> >>>
>>
>> >>>>    some of it to the CheckpointMark. Filed BEAM-704
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> The original thread is called "Should UnboundedSource provide a split
>>
>> >>>
>>
>> >>>
>>
>> >>>> identifier ?".
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> While the only specific implementation of UnboundedSource discussed
>> here
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
>>
>> >>>
>>
>> >>> Having
>>
>> >>>
>>
>> >>>> said that, I wonder where this meets PubSub ? or any other
>>
>> >>>
>>
>> >>> UnboundedSource
>>
>> >>>
>>
>> >>>> that those questions might affect.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> Thanks,
>>
>> >>>
>>
>> >>>
>>
>> >>>> Amit
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> --
>>
>> >>>
>>
>> >>> Jean-Baptiste Onofré
>>
>> >>>
>>
>> >>> jbonofre@apache.org
>>
>> >>>
>>
>> >>> http://blog.nanthrax.net
>>
>> >>>
>>
>> >>> Talend - http://www.talend.com
>>
>> >>>
>>
>> >>>
>>
>> >>
>>
>> >
>>
>> > --
>>
>> > Jean-Baptiste Onofré
>>
>> > jbonofre@apache.org
>>
>> > http://blog.nanthrax.net
>>
>> > Talend - http://www.talend.com
>>
>>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for the explanation, Amit!

What you described doesn't sound so different from how the Flink
Runner interfaces with the UnboundedSource interface. Taken aside the
mini batches and the discretization of the stream that you need to
apply therefore, the checkpointing logic is pretty similar. The Flink
wrapper doesn't use an id to identify the checkpointed state because
the state is kept per operator and restored to each instance in case
of a failure. In Spark, the state is directly scoped by key. That
actually makes a lot of sense when you want to rescale a job and
that's the direction in which Flink is currently improving its state
interface.


-Max


On Mon, Oct 10, 2016 at 3:35 PM, Amit Sela <am...@gmail.com> wrote:
> Thanks Max!
>
> I'll try to explain Spark's stateful operators and how/why I used them with
> UnboundedSource.
>
> Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> Since updateStateByKey is bound to output the (updated) state itself - the
> CheckpointMark in our case - we're left with mapWithState.
> mapWithState provides a persistent, distributed "map-like", that is
> partitioned according to the stream. This is indeed how I manage state
> between micro-batches.
> However, mapWithState (like any map) will give you a value (state)
> corresponding to a specific key, so I use a running-id from the initial
> splitting to identify the appropriate state.
> I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> could do the same and save the split source with the CheckpointMark but
> it'll still have to correspond to the same id, and since I had to wrap the
> split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
> added an id field and I'm hashing by that id.
> I'll also add that the stateful operator can only be applied to a
> (Pair)Stream and not to input operators so I'm actually generating a stream
> of splits (the same ones for every micro-batch) and reading from within the
> mappingFunction of the mapWithState.
>
> It's not the simplest design, but given how Spark's persistent state and
> InputDStream are designed comparing to the Beam model, I don't see another
> way - though I'd be happy to hear one!
>
> Pretty sure I've added this here but no harm in adding the link again: design
> doc
> <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
> and
> a work-in-progress branch
> <https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
> mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as
> well.
> The design doc also relates to how "pure" Spark works with Kafka, which I
> think is interesting and very different from Flink/Dataflow.
>
> Hope this helped clear things up a little, please keep on asking if
> something is not clear yet.
>
> Thanks,
> Amit.
>
> On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <mx...@apache.org> wrote:
>
>> Just to add a comment from the Flink side and its
>>
>> UnboundedSourceWrapper. We experienced the only way to guarantee
>>
>> deterministic splitting of the source, was to generate the splits upon
>>
>> creation of the source and then checkpoint the assignment during
>>
>> runtime. When restoring from a checkpoint, the same reader
>>
>> configuration is restored. It's not possible to change the splitting
>>
>> after the initial splitting has taken place. However, Flink will soon
>>
>> be able to repartition the operator state upon restart/rescaling of a
>>
>> job.
>>
>>
>>
>> Does Spark have a way to pass state of a previous mini batch to the
>>
>> current mini batch? If so, you could restore the last configuration
>>
>> and continue reading from the checkpointed offset. You just have to
>>
>> checkpoint before the mini batch ends.
>>
>>
>>
>> -Max
>>
>>
>>
>> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>> > Hi Amit,
>>
>> >
>>
>> > thanks for the explanation.
>>
>> >
>>
>> > For 4, you are right, it's slightly different from DataXchange (related
>> to
>>
>> > the elements in the PCollection). I think storing the "starting point"
>> for a
>>
>> > reader makes sense.
>>
>> >
>>
>> > Regards
>>
>> > JB
>>
>> >
>>
>> >
>>
>> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> >>
>>
>> >> Inline, thanks JB!
>>
>> >>
>>
>> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>>
>> >> wrote:
>>
>> >>
>>
>> >>> Hi Amit,
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 1., the runner is responsible of the checkpoint storage (associated
>>
>> >>>
>>
>> >>> with the source). It's the way for the runner to retry and know the
>>
>> >>>
>>
>> >>> failed bundles.
>>
>> >>>
>>
>> >> True, this was a recap/summary of another, not-so-clear, thread.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>>
>> >>>
>>
>> >>> that ? It sounds like what I proposed in the "Technical Vision"
>> appendix
>>
>> >>>
>>
>> >>> document: there I proposed to introduce a DataXchange object that store
>>
>> >>>
>>
>> >>> some additional metadata (like offset) used by the runner. It would be
>>
>> >>>
>>
>> >>> the same with SDF as the tracker state should be persistent as well.
>>
>> >>>
>>
>> >> I think I was more focused on persisting the "starting point" for a
>>
>> >> reader,
>>
>> >> even if no records were read (yet), so that the next time the reader
>>
>> >> attempts to read it will pick of there. This has more to do with how the
>>
>> >> CheckpointMark handles this.
>>
>> >> I have to say that I'm not familiar with your DataXchange proposal, I
>> will
>>
>> >> take a look though.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> Regards
>>
>> >>>
>>
>> >>> JB
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>
>> >>>
>>
>> >>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>
>> >>>
>>
>> >>>
>>
>> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
>> best
>>
>> >>>
>>
>> >>> to
>>
>> >>>
>>
>> >>>> start over in a clear [DISCUSS] thread.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> When working on UnboundedSource support for the Spark runner, I've
>>
>> >>>> raised
>>
>> >>>
>>
>> >>>
>>
>> >>>> some questions, some were general-UnboundedSource, and others
>>
>> >>>
>>
>> >>>
>>
>> >>>> Kafka-specific.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> I'd like to recap them here, and maybe have a more productive and
>>
>> >>>
>>
>> >>>
>>
>> >>>> well-documented discussion for everyone.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>>    1. UnboundedSource id's - I assume any runner persists the
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>
>> >>>
>>
>> >>> how it
>>
>> >>>
>>
>> >>>>    matches the appropriate split (of the UnboundedSource) to it's
>>
>> >>>
>>
>> >>> previously
>>
>> >>>
>>
>> >>>>    persisted CheckpointMark in any specific worker ?
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* mentioned that Source splits have to have an
>>
>> >>>
>>
>> >>>
>>
>> >>>> associated identifier,
>>
>> >>>
>>
>> >>>
>>
>> >>>>    and so the runner gets to tag splits however it pleases, so long as
>>
>> >>>
>>
>> >>>
>>
>> >>>>    those tags don't allow splits to bleed into each other.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>>
>> >>>
>>
>> >>> require
>>
>> >>>
>>
>> >>>>    consistent splitting if it were to "pick-up where it left",
>> correct ?
>>
>> >>>
>>
>> >>> this
>>
>> >>>
>>
>> >>>>    is not mentioned as a requirement or a recommendation in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>
>> >>>
>>
>> >>> issue ?
>>
>> >>>
>>
>> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>
>> >>>
>>
>> >>>
>>
>> >>>>    partitions to readers in a round-robin manner.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>
>> >>>
>>
>> >>>
>>
>> >>>>    require deterministic splitting (although it's recommended), a
>>
>> >>>
>>
>> >>>
>>
>> >>>>    PipelineRunner
>>
>> >>>
>>
>> >>>
>>
>> >>>>    should keep track of the initially generated splits.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    3. Support reading of Kafka partitions that were added to topic/s
>>
>> >>>
>>
>> >>> while
>>
>> >>>
>>
>> >>>>    a Pipeline reads from them - BEAM-727
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>
>> >>>
>>
>> >>>
>>
>> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>
>> >>>
>>
>> >>>
>>
>> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
>> each
>>
>> >>>
>>
>> >>> worker
>>
>> >>>
>>
>> >>>>    would actually begin reading only after it saw a message during the
>>
>> >>>
>>
>> >>> time
>>
>> >>>
>>
>> >>>>    window it had to read messages. This is because fetching the
>> offsets
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>>    done by the worker running the Reader. This means that each Reader
>>
>> >>>
>>
>> >>> sees a
>>
>> >>>
>>
>> >>>>    different state of "latest" (for his partition/s), such that a
>>
>> >>>> failing
>>
>> >>>
>>
>> >>>
>>
>> >>>>    Reader that hasn't read yet might fetch a different "latest" once
>>
>> >>>> it's
>>
>> >>>
>>
>> >>>
>>
>> >>>>    recovered then what it originally fetched. While this may not be as
>>
>> >>>
>>
>> >>> painful
>>
>> >>>
>>
>> >>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>>
>> >>>
>>
>> >>> reading
>>
>> >>>
>>
>> >>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>>
>> >>>
>>
>> >>> add
>>
>> >>>
>>
>> >>>>    some of it to the CheckpointMark. Filed BEAM-704
>>
>> >>>
>>
>> >>>
>>
>> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> The original thread is called "Should UnboundedSource provide a split
>>
>> >>>
>>
>> >>>
>>
>> >>>> identifier ?".
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> While the only specific implementation of UnboundedSource discussed
>> here
>>
>> >>>
>>
>> >>> is
>>
>> >>>
>>
>> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
>>
>> >>>
>>
>> >>> Having
>>
>> >>>
>>
>> >>>> said that, I wonder where this meets PubSub ? or any other
>>
>> >>>
>>
>> >>> UnboundedSource
>>
>> >>>
>>
>> >>>> that those questions might affect.
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>> Thanks,
>>
>> >>>
>>
>> >>>
>>
>> >>>> Amit
>>
>> >>>
>>
>> >>>
>>
>> >>>>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> --
>>
>> >>>
>>
>> >>> Jean-Baptiste Onofré
>>
>> >>>
>>
>> >>> jbonofre@apache.org
>>
>> >>>
>>
>> >>> http://blog.nanthrax.net
>>
>> >>>
>>
>> >>> Talend - http://www.talend.com
>>
>> >>>
>>
>> >>>
>>
>> >>
>>
>> >
>>
>> > --
>>
>> > Jean-Baptiste Onofré
>>
>> > jbonofre@apache.org
>>
>> > http://blog.nanthrax.net
>>
>> > Talend - http://www.talend.com
>>
>>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Amit Sela <am...@gmail.com>.
Thanks Max!

I'll try to explain Spark's stateful operators and how/why I used them with
UnboundedSource.

Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
Since updateStateByKey is bound to output the (updated) state itself - the
CheckpointMark in our case - we're left with mapWithState.
mapWithState provides a persistent, distributed "map-like", that is
partitioned according to the stream. This is indeed how I manage state
between micro-batches.
However, mapWithState (like any map) will give you a value (state)
corresponding to a specific key, so I use a running-id from the initial
splitting to identify the appropriate state.
I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
could do the same and save the split source with the CheckpointMark but
it'll still have to correspond to the same id, and since I had to wrap the
split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
added an id field and I'm hashing by that id.
I'll also add that the stateful operator can only be applied to a
(Pair)Stream and not to input operators so I'm actually generating a stream
of splits (the same ones for every micro-batch) and reading from within the
mappingFunction of the mapWithState.

It's not the simplest design, but given how Spark's persistent state and
InputDStream are designed comparing to the Beam model, I don't see another
way - though I'd be happy to hear one!

Pretty sure I've added this here but no harm in adding the link again: design
doc
<https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
and
a work-in-progress branch
<https://github.com/amitsela/incubator-beam/tree/BEAM-658-WIP> all
mentioned in the ticket <https://issues.apache.org/jira/browse/BEAM-658> as
well.
The design doc also relates to how "pure" Spark works with Kafka, which I
think is interesting and very different from Flink/Dataflow.

Hope this helped clear things up a little, please keep on asking if
something is not clear yet.

Thanks,
Amit.

On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels <mx...@apache.org> wrote:

> Just to add a comment from the Flink side and its
>
> UnboundedSourceWrapper. We experienced the only way to guarantee
>
> deterministic splitting of the source, was to generate the splits upon
>
> creation of the source and then checkpoint the assignment during
>
> runtime. When restoring from a checkpoint, the same reader
>
> configuration is restored. It's not possible to change the splitting
>
> after the initial splitting has taken place. However, Flink will soon
>
> be able to repartition the operator state upon restart/rescaling of a
>
> job.
>
>
>
> Does Spark have a way to pass state of a previous mini batch to the
>
> current mini batch? If so, you could restore the last configuration
>
> and continue reading from the checkpointed offset. You just have to
>
> checkpoint before the mini batch ends.
>
>
>
> -Max
>
>
>
> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
> > Hi Amit,
>
> >
>
> > thanks for the explanation.
>
> >
>
> > For 4, you are right, it's slightly different from DataXchange (related
> to
>
> > the elements in the PCollection). I think storing the "starting point"
> for a
>
> > reader makes sense.
>
> >
>
> > Regards
>
> > JB
>
> >
>
> >
>
> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>
> >>
>
> >> Inline, thanks JB!
>
> >>
>
> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>
> >> wrote:
>
> >>
>
> >>> Hi Amit,
>
> >>>
>
> >>>
>
> >>>
>
> >>> For 1., the runner is responsible of the checkpoint storage (associated
>
> >>>
>
> >>> with the source). It's the way for the runner to retry and know the
>
> >>>
>
> >>> failed bundles.
>
> >>>
>
> >> True, this was a recap/summary of another, not-so-clear, thread.
>
> >>
>
> >>>
>
> >>>
>
> >>>
>
> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>
> >>>
>
> >>> that ? It sounds like what I proposed in the "Technical Vision"
> appendix
>
> >>>
>
> >>> document: there I proposed to introduce a DataXchange object that store
>
> >>>
>
> >>> some additional metadata (like offset) used by the runner. It would be
>
> >>>
>
> >>> the same with SDF as the tracker state should be persistent as well.
>
> >>>
>
> >> I think I was more focused on persisting the "starting point" for a
>
> >> reader,
>
> >> even if no records were read (yet), so that the next time the reader
>
> >> attempts to read it will pick of there. This has more to do with how the
>
> >> CheckpointMark handles this.
>
> >> I have to say that I'm not familiar with your DataXchange proposal, I
> will
>
> >> take a look though.
>
> >>
>
> >>>
>
> >>>
>
> >>>
>
> >>> Regards
>
> >>>
>
> >>> JB
>
> >>>
>
> >>>
>
> >>>
>
> >>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>
> >>>
>
> >>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>
> >>>
>
> >>>
>
> >>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's
> best
>
> >>>
>
> >>> to
>
> >>>
>
> >>>> start over in a clear [DISCUSS] thread.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> When working on UnboundedSource support for the Spark runner, I've
>
> >>>> raised
>
> >>>
>
> >>>
>
> >>>> some questions, some were general-UnboundedSource, and others
>
> >>>
>
> >>>
>
> >>>> Kafka-specific.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> I'd like to recap them here, and maybe have a more productive and
>
> >>>
>
> >>>
>
> >>>> well-documented discussion for everyone.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>>    1. UnboundedSource id's - I assume any runner persists the
>
> >>>
>
> >>>
>
> >>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>
> >>>
>
> >>> how it
>
> >>>
>
> >>>>    matches the appropriate split (of the UnboundedSource) to it's
>
> >>>
>
> >>> previously
>
> >>>
>
> >>>>    persisted CheckpointMark in any specific worker ?
>
> >>>
>
> >>>
>
> >>>>    *Thomas Groh* mentioned that Source splits have to have an
>
> >>>
>
> >>>
>
> >>>> associated identifier,
>
> >>>
>
> >>>
>
> >>>>    and so the runner gets to tag splits however it pleases, so long as
>
> >>>
>
> >>>
>
> >>>>    those tags don't allow splits to bleed into each other.
>
> >>>
>
> >>>
>
> >>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>
> >>>
>
> >>> require
>
> >>>
>
> >>>>    consistent splitting if it were to "pick-up where it left",
> correct ?
>
> >>>
>
> >>> this
>
> >>>
>
> >>>>    is not mentioned as a requirement or a recommendation in
>
> >>>
>
> >>>
>
> >>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>
> >>>
>
> >>> issue ?
>
> >>>
>
> >>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>
> >>>
>
> >>>
>
> >>>>    partitions to readers in a round-robin manner.
>
> >>>
>
> >>>
>
> >>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>
> >>>
>
> >>>
>
> >>>>    require deterministic splitting (although it's recommended), a
>
> >>>
>
> >>>
>
> >>>>    PipelineRunner
>
> >>>
>
> >>>
>
> >>>>    should keep track of the initially generated splits.
>
> >>>
>
> >>>
>
> >>>>    3. Support reading of Kafka partitions that were added to topic/s
>
> >>>
>
> >>> while
>
> >>>
>
> >>>>    a Pipeline reads from them - BEAM-727
>
> >>>
>
> >>>
>
> >>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>
> >>>
>
> >>>
>
> >>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>
> >>>
>
> >>>
>
> >>>>    micro-batches, if "latest" was applied on a fairly sparse topic
> each
>
> >>>
>
> >>> worker
>
> >>>
>
> >>>>    would actually begin reading only after it saw a message during the
>
> >>>
>
> >>> time
>
> >>>
>
> >>>>    window it had to read messages. This is because fetching the
> offsets
>
> >>>
>
> >>> is
>
> >>>
>
> >>>>    done by the worker running the Reader. This means that each Reader
>
> >>>
>
> >>> sees a
>
> >>>
>
> >>>>    different state of "latest" (for his partition/s), such that a
>
> >>>> failing
>
> >>>
>
> >>>
>
> >>>>    Reader that hasn't read yet might fetch a different "latest" once
>
> >>>> it's
>
> >>>
>
> >>>
>
> >>>>    recovered then what it originally fetched. While this may not be as
>
> >>>
>
> >>> painful
>
> >>>
>
> >>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>
> >>>
>
> >>> reading
>
> >>>
>
> >>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>
> >>>
>
> >>> add
>
> >>>
>
> >>>>    some of it to the CheckpointMark. Filed BEAM-704
>
> >>>
>
> >>>
>
> >>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> The original thread is called "Should UnboundedSource provide a split
>
> >>>
>
> >>>
>
> >>>> identifier ?".
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> While the only specific implementation of UnboundedSource discussed
> here
>
> >>>
>
> >>> is
>
> >>>
>
> >>>> Kafka, it is probably the most popular open-source UnboundedSource.
>
> >>>
>
> >>> Having
>
> >>>
>
> >>>> said that, I wonder where this meets PubSub ? or any other
>
> >>>
>
> >>> UnboundedSource
>
> >>>
>
> >>>> that those questions might affect.
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>> Thanks,
>
> >>>
>
> >>>
>
> >>>> Amit
>
> >>>
>
> >>>
>
> >>>>
>
> >>>
>
> >>>
>
> >>>
>
> >>> --
>
> >>>
>
> >>> Jean-Baptiste Onofré
>
> >>>
>
> >>> jbonofre@apache.org
>
> >>>
>
> >>> http://blog.nanthrax.net
>
> >>>
>
> >>> Talend - http://www.talend.com
>
> >>>
>
> >>>
>
> >>
>
> >
>
> > --
>
> > Jean-Baptiste Onofré
>
> > jbonofre@apache.org
>
> > http://blog.nanthrax.net
>
> > Talend - http://www.talend.com
>
>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Max,

thanks for the explanation and it makes lot of sense.

Not sure it will be so simple to store a previous state from one 
micro-batch to another. Let me take a look with Amit.

Regards
JB

On 10/10/2016 03:02 PM, Maximilian Michels wrote:
> Just to add a comment from the Flink side and its
> UnboundedSourceWrapper. We experienced the only way to guarantee
> deterministic splitting of the source, was to generate the splits upon
> creation of the source and then checkpoint the assignment during
> runtime. When restoring from a checkpoint, the same reader
> configuration is restored. It's not possible to change the splitting
> after the initial splitting has taken place. However, Flink will soon
> be able to repartition the operator state upon restart/rescaling of a
> job.
>
> Does Spark have a way to pass state of a previous mini batch to the
> current mini batch? If so, you could restore the last configuration
> and continue reading from the checkpointed offset. You just have to
> checkpoint before the mini batch ends.
>
> -Max
>
> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofr� <jb...@nanthrax.net> wrote:
>> Hi Amit,
>>
>> thanks for the explanation.
>>
>> For 4, you are right, it's slightly different from DataXchange (related to
>> the elements in the PCollection). I think storing the "starting point" for a
>> reader makes sense.
>>
>> Regards
>> JB
>>
>>
>> On 10/10/2016 10:33 AM, Amit Sela wrote:
>>>
>>> Inline, thanks JB!
>>>
>>> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofr� <jb...@nanthrax.net>
>>> wrote:
>>>
>>>> Hi Amit,
>>>>
>>>>
>>>>
>>>> For 1., the runner is responsible of the checkpoint storage (associated
>>>>
>>>> with the source). It's the way for the runner to retry and know the
>>>>
>>>> failed bundles.
>>>>
>>> True, this was a recap/summary of another, not-so-clear, thread.
>>>
>>>>
>>>>
>>>>
>>>> For 4, are you proposing that KafkaRecord store additional metadata for
>>>>
>>>> that ? It sounds like what I proposed in the "Technical Vision" appendix
>>>>
>>>> document: there I proposed to introduce a DataXchange object that store
>>>>
>>>> some additional metadata (like offset) used by the runner. It would be
>>>>
>>>> the same with SDF as the tracker state should be persistent as well.
>>>>
>>> I think I was more focused on persisting the "starting point" for a
>>> reader,
>>> even if no records were read (yet), so that the next time the reader
>>> attempts to read it will pick of there. This has more to do with how the
>>> CheckpointMark handles this.
>>> I have to say that I'm not familiar with your DataXchange proposal, I will
>>> take a look though.
>>>
>>>>
>>>>
>>>>
>>>> Regards
>>>>
>>>> JB
>>>>
>>>>
>>>>
>>>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>>>
>>>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>>>
>>>>
>>>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's best
>>>>
>>>> to
>>>>
>>>>> start over in a clear [DISCUSS] thread.
>>>>
>>>>
>>>>>
>>>>
>>>>> When working on UnboundedSource support for the Spark runner, I've
>>>>> raised
>>>>
>>>>
>>>>> some questions, some were general-UnboundedSource, and others
>>>>
>>>>
>>>>> Kafka-specific.
>>>>
>>>>
>>>>>
>>>>
>>>>> I'd like to recap them here, and maybe have a more productive and
>>>>
>>>>
>>>>> well-documented discussion for everyone.
>>>>
>>>>
>>>>>
>>>>
>>>>>    1. UnboundedSource id's - I assume any runner persists the
>>>>
>>>>
>>>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>>>
>>>> how it
>>>>
>>>>>    matches the appropriate split (of the UnboundedSource) to it's
>>>>
>>>> previously
>>>>
>>>>>    persisted CheckpointMark in any specific worker ?
>>>>
>>>>
>>>>>    *Thomas Groh* mentioned that Source splits have to have an
>>>>
>>>>
>>>>> associated identifier,
>>>>
>>>>
>>>>>    and so the runner gets to tag splits however it pleases, so long as
>>>>
>>>>
>>>>>    those tags don't allow splits to bleed into each other.
>>>>
>>>>
>>>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>>>>
>>>> require
>>>>
>>>>>    consistent splitting if it were to "pick-up where it left", correct ?
>>>>
>>>> this
>>>>
>>>>>    is not mentioned as a requirement or a recommendation in
>>>>
>>>>
>>>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>>>
>>>> issue ?
>>>>
>>>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>>>
>>>>
>>>>>    partitions to readers in a round-robin manner.
>>>>
>>>>
>>>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>>>
>>>>
>>>>>    require deterministic splitting (although it's recommended), a
>>>>
>>>>
>>>>>    PipelineRunner
>>>>
>>>>
>>>>>    should keep track of the initially generated splits.
>>>>
>>>>
>>>>>    3. Support reading of Kafka partitions that were added to topic/s
>>>>
>>>> while
>>>>
>>>>>    a Pipeline reads from them - BEAM-727
>>>>
>>>>
>>>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>>>
>>>>
>>>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>>>
>>>>
>>>>>    micro-batches, if "latest" was applied on a fairly sparse topic each
>>>>
>>>> worker
>>>>
>>>>>    would actually begin reading only after it saw a message during the
>>>>
>>>> time
>>>>
>>>>>    window it had to read messages. This is because fetching the offsets
>>>>
>>>> is
>>>>
>>>>>    done by the worker running the Reader. This means that each Reader
>>>>
>>>> sees a
>>>>
>>>>>    different state of "latest" (for his partition/s), such that a
>>>>> failing
>>>>
>>>>
>>>>>    Reader that hasn't read yet might fetch a different "latest" once
>>>>> it's
>>>>
>>>>
>>>>>    recovered then what it originally fetched. While this may not be as
>>>>
>>>> painful
>>>>
>>>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>>>>
>>>> reading
>>>>
>>>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>>>>
>>>> add
>>>>
>>>>>    some of it to the CheckpointMark. Filed BEAM-704
>>>>
>>>>
>>>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>>>
>>>>
>>>>>
>>>>
>>>>> The original thread is called "Should UnboundedSource provide a split
>>>>
>>>>
>>>>> identifier ?".
>>>>
>>>>
>>>>>
>>>>
>>>>> While the only specific implementation of UnboundedSource discussed here
>>>>
>>>> is
>>>>
>>>>> Kafka, it is probably the most popular open-source UnboundedSource.
>>>>
>>>> Having
>>>>
>>>>> said that, I wonder where this meets PubSub ? or any other
>>>>
>>>> UnboundedSource
>>>>
>>>>> that those questions might affect.
>>>>
>>>>
>>>>>
>>>>
>>>>> Thanks,
>>>>
>>>>
>>>>> Amit
>>>>
>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Jean-Baptiste Onofr�
>>>>
>>>> jbonofre@apache.org
>>>>
>>>> http://blog.nanthrax.net
>>>>
>>>> Talend - http://www.talend.com
>>>>
>>>>
>>>
>>
>> --
>> Jean-Baptiste Onofr�
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Maximilian Michels <mx...@apache.org>.
Just to add a comment from the Flink side and its
UnboundedSourceWrapper. We experienced the only way to guarantee
deterministic splitting of the source, was to generate the splits upon
creation of the source and then checkpoint the assignment during
runtime. When restoring from a checkpoint, the same reader
configuration is restored. It's not possible to change the splitting
after the initial splitting has taken place. However, Flink will soon
be able to repartition the operator state upon restart/rescaling of a
job.

Does Spark have a way to pass state of a previous mini batch to the
current mini batch? If so, you could restore the last configuration
and continue reading from the checkpointed offset. You just have to
checkpoint before the mini batch ends.

-Max

On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
> Hi Amit,
>
> thanks for the explanation.
>
> For 4, you are right, it's slightly different from DataXchange (related to
> the elements in the PCollection). I think storing the "starting point" for a
> reader makes sense.
>
> Regards
> JB
>
>
> On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> Inline, thanks JB!
>>
>> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>>
>>> Hi Amit,
>>>
>>>
>>>
>>> For 1., the runner is responsible of the checkpoint storage (associated
>>>
>>> with the source). It's the way for the runner to retry and know the
>>>
>>> failed bundles.
>>>
>> True, this was a recap/summary of another, not-so-clear, thread.
>>
>>>
>>>
>>>
>>> For 4, are you proposing that KafkaRecord store additional metadata for
>>>
>>> that ? It sounds like what I proposed in the "Technical Vision" appendix
>>>
>>> document: there I proposed to introduce a DataXchange object that store
>>>
>>> some additional metadata (like offset) used by the runner. It would be
>>>
>>> the same with SDF as the tracker state should be persistent as well.
>>>
>> I think I was more focused on persisting the "starting point" for a
>> reader,
>> even if no records were read (yet), so that the next time the reader
>> attempts to read it will pick of there. This has more to do with how the
>> CheckpointMark handles this.
>> I have to say that I'm not familiar with your DataXchange proposal, I will
>> take a look though.
>>
>>>
>>>
>>>
>>> Regards
>>>
>>> JB
>>>
>>>
>>>
>>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>>
>>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>>
>>>
>>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's best
>>>
>>> to
>>>
>>>> start over in a clear [DISCUSS] thread.
>>>
>>>
>>>>
>>>
>>>> When working on UnboundedSource support for the Spark runner, I've
>>>> raised
>>>
>>>
>>>> some questions, some were general-UnboundedSource, and others
>>>
>>>
>>>> Kafka-specific.
>>>
>>>
>>>>
>>>
>>>> I'd like to recap them here, and maybe have a more productive and
>>>
>>>
>>>> well-documented discussion for everyone.
>>>
>>>
>>>>
>>>
>>>>    1. UnboundedSource id's - I assume any runner persists the
>>>
>>>
>>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>>>
>>> how it
>>>
>>>>    matches the appropriate split (of the UnboundedSource) to it's
>>>
>>> previously
>>>
>>>>    persisted CheckpointMark in any specific worker ?
>>>
>>>
>>>>    *Thomas Groh* mentioned that Source splits have to have an
>>>
>>>
>>>> associated identifier,
>>>
>>>
>>>>    and so the runner gets to tag splits however it pleases, so long as
>>>
>>>
>>>>    those tags don't allow splits to bleed into each other.
>>>
>>>
>>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>>>
>>> require
>>>
>>>>    consistent splitting if it were to "pick-up where it left", correct ?
>>>
>>> this
>>>
>>>>    is not mentioned as a requirement or a recommendation in
>>>
>>>
>>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>>>
>>> issue ?
>>>
>>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>>
>>>
>>>>    partitions to readers in a round-robin manner.
>>>
>>>
>>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>>
>>>
>>>>    require deterministic splitting (although it's recommended), a
>>>
>>>
>>>>    PipelineRunner
>>>
>>>
>>>>    should keep track of the initially generated splits.
>>>
>>>
>>>>    3. Support reading of Kafka partitions that were added to topic/s
>>>
>>> while
>>>
>>>>    a Pipeline reads from them - BEAM-727
>>>
>>>
>>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>>
>>>
>>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>>
>>>
>>>>    micro-batches, if "latest" was applied on a fairly sparse topic each
>>>
>>> worker
>>>
>>>>    would actually begin reading only after it saw a message during the
>>>
>>> time
>>>
>>>>    window it had to read messages. This is because fetching the offsets
>>>
>>> is
>>>
>>>>    done by the worker running the Reader. This means that each Reader
>>>
>>> sees a
>>>
>>>>    different state of "latest" (for his partition/s), such that a
>>>> failing
>>>
>>>
>>>>    Reader that hasn't read yet might fetch a different "latest" once
>>>> it's
>>>
>>>
>>>>    recovered then what it originally fetched. While this may not be as
>>>
>>> painful
>>>
>>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>>>
>>> reading
>>>
>>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>>>
>>> add
>>>
>>>>    some of it to the CheckpointMark. Filed BEAM-704
>>>
>>>
>>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>>
>>>
>>>>
>>>
>>>> The original thread is called "Should UnboundedSource provide a split
>>>
>>>
>>>> identifier ?".
>>>
>>>
>>>>
>>>
>>>> While the only specific implementation of UnboundedSource discussed here
>>>
>>> is
>>>
>>>> Kafka, it is probably the most popular open-source UnboundedSource.
>>>
>>> Having
>>>
>>>> said that, I wonder where this meets PubSub ? or any other
>>>
>>> UnboundedSource
>>>
>>>> that those questions might affect.
>>>
>>>
>>>>
>>>
>>>> Thanks,
>>>
>>>
>>>> Amit
>>>
>>>
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Jean-Baptiste Onofré
>>>
>>> jbonofre@apache.org
>>>
>>> http://blog.nanthrax.net
>>>
>>> Talend - http://www.talend.com
>>>
>>>
>>
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Amit,

thanks for the explanation.

For 4, you are right, it's slightly different from DataXchange (related 
to the elements in the PCollection). I think storing the "starting 
point" for a reader makes sense.

Regards
JB

On 10/10/2016 10:33 AM, Amit Sela wrote:
> Inline, thanks JB!
>
> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofr� <jb...@nanthrax.net>
> wrote:
>
>> Hi Amit,
>>
>>
>>
>> For 1., the runner is responsible of the checkpoint storage (associated
>>
>> with the source). It's the way for the runner to retry and know the
>>
>> failed bundles.
>>
> True, this was a recap/summary of another, not-so-clear, thread.
>
>>
>>
>>
>> For 4, are you proposing that KafkaRecord store additional metadata for
>>
>> that ? It sounds like what I proposed in the "Technical Vision" appendix
>>
>> document: there I proposed to introduce a DataXchange object that store
>>
>> some additional metadata (like offset) used by the runner. It would be
>>
>> the same with SDF as the tracker state should be persistent as well.
>>
> I think I was more focused on persisting the "starting point" for a reader,
> even if no records were read (yet), so that the next time the reader
> attempts to read it will pick of there. This has more to do with how the
> CheckpointMark handles this.
> I have to say that I'm not familiar with your DataXchange proposal, I will
> take a look though.
>
>>
>>
>>
>> Regards
>>
>> JB
>>
>>
>>
>> On 10/08/2016 01:55 AM, Amit Sela wrote:
>>
>>> I started a thread about (suggesting) UnboundedSource splitId's and it
>>
>>> turned into an UnboundedSource/KafkaIO discussion, and I think it's best
>> to
>>
>>> start over in a clear [DISCUSS] thread.
>>
>>>
>>
>>> When working on UnboundedSource support for the Spark runner, I've raised
>>
>>> some questions, some were general-UnboundedSource, and others
>>
>>> Kafka-specific.
>>
>>>
>>
>>> I'd like to recap them here, and maybe have a more productive and
>>
>>> well-documented discussion for everyone.
>>
>>>
>>
>>>    1. UnboundedSource id's - I assume any runner persists the
>>
>>>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
>> how it
>>
>>>    matches the appropriate split (of the UnboundedSource) to it's
>> previously
>>
>>>    persisted CheckpointMark in any specific worker ?
>>
>>>    *Thomas Groh* mentioned that Source splits have to have an
>>
>>> associated identifier,
>>
>>>    and so the runner gets to tag splits however it pleases, so long as
>>
>>>    those tags don't allow splits to bleed into each other.
>>
>>>    2. Consistent splitting - an UnboundedSource splitting seems to
>> require
>>
>>>    consistent splitting if it were to "pick-up where it left", correct ?
>> this
>>
>>>    is not mentioned as a requirement or a recommendation in
>>
>>>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
>> issue ?
>>
>>>    *Raghu Angadi* mentioned that Kafka already does so by applying
>>
>>>    partitions to readers in a round-robin manner.
>>
>>>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>>
>>>    require deterministic splitting (although it's recommended), a
>>
>>>    PipelineRunner
>>
>>>    should keep track of the initially generated splits.
>>
>>>    3. Support reading of Kafka partitions that were added to topic/s
>> while
>>
>>>    a Pipeline reads from them - BEAM-727
>>
>>>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>>
>>>    4. Reading/persisting Kafka start offsets - since Spark works in
>>
>>>    micro-batches, if "latest" was applied on a fairly sparse topic each
>> worker
>>
>>>    would actually begin reading only after it saw a message during the
>> time
>>
>>>    window it had to read messages. This is because fetching the offsets
>> is
>>
>>>    done by the worker running the Reader. This means that each Reader
>> sees a
>>
>>>    different state of "latest" (for his partition/s), such that a failing
>>
>>>    Reader that hasn't read yet might fetch a different "latest" once it's
>>
>>>    recovered then what it originally fetched. While this may not be as
>> painful
>>
>>>    for other runners, IMHO it lacks correctness and I'd suggest either
>> reading
>>
>>>    Kafka metadata of the Kafka cluster once upon initial splitting, or
>> add
>>
>>>    some of it to the CheckpointMark. Filed BEAM-704
>>
>>>    <https://issues.apache.org/jira/browse/BEAM-704>.
>>
>>>
>>
>>> The original thread is called "Should UnboundedSource provide a split
>>
>>> identifier ?".
>>
>>>
>>
>>> While the only specific implementation of UnboundedSource discussed here
>> is
>>
>>> Kafka, it is probably the most popular open-source UnboundedSource.
>> Having
>>
>>> said that, I wonder where this meets PubSub ? or any other
>> UnboundedSource
>>
>>> that those questions might affect.
>>
>>>
>>
>>> Thanks,
>>
>>> Amit
>>
>>>
>>
>>
>>
>> --
>>
>> Jean-Baptiste Onofr�
>>
>> jbonofre@apache.org
>>
>> http://blog.nanthrax.net
>>
>> Talend - http://www.talend.com
>>
>>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Amit Sela <am...@gmail.com>.
Inline, thanks JB!

On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Amit,
>
>
>
> For 1., the runner is responsible of the checkpoint storage (associated
>
> with the source). It's the way for the runner to retry and know the
>
> failed bundles.
>
True, this was a recap/summary of another, not-so-clear, thread.

>
>
>
> For 4, are you proposing that KafkaRecord store additional metadata for
>
> that ? It sounds like what I proposed in the "Technical Vision" appendix
>
> document: there I proposed to introduce a DataXchange object that store
>
> some additional metadata (like offset) used by the runner. It would be
>
> the same with SDF as the tracker state should be persistent as well.
>
I think I was more focused on persisting the "starting point" for a reader,
even if no records were read (yet), so that the next time the reader
attempts to read it will pick of there. This has more to do with how the
CheckpointMark handles this.
I have to say that I'm not familiar with your DataXchange proposal, I will
take a look though.

>
>
>
> Regards
>
> JB
>
>
>
> On 10/08/2016 01:55 AM, Amit Sela wrote:
>
> > I started a thread about (suggesting) UnboundedSource splitId's and it
>
> > turned into an UnboundedSource/KafkaIO discussion, and I think it's best
> to
>
> > start over in a clear [DISCUSS] thread.
>
> >
>
> > When working on UnboundedSource support for the Spark runner, I've raised
>
> > some questions, some were general-UnboundedSource, and others
>
> > Kafka-specific.
>
> >
>
> > I'd like to recap them here, and maybe have a more productive and
>
> > well-documented discussion for everyone.
>
> >
>
> >    1. UnboundedSource id's - I assume any runner persists the
>
> >    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder
> how it
>
> >    matches the appropriate split (of the UnboundedSource) to it's
> previously
>
> >    persisted CheckpointMark in any specific worker ?
>
> >    *Thomas Groh* mentioned that Source splits have to have an
>
> > associated identifier,
>
> >    and so the runner gets to tag splits however it pleases, so long as
>
> >    those tags don't allow splits to bleed into each other.
>
> >    2. Consistent splitting - an UnboundedSource splitting seems to
> require
>
> >    consistent splitting if it were to "pick-up where it left", correct ?
> this
>
> >    is not mentioned as a requirement or a recommendation in
>
> >    UnboundedSource#generateInitialSplits(), so is this a Kafka-only
> issue ?
>
> >    *Raghu Angadi* mentioned that Kafka already does so by applying
>
> >    partitions to readers in a round-robin manner.
>
> >    *Thomas Groh* also added that while the UnboundedSource API doesn't
>
> >    require deterministic splitting (although it's recommended), a
>
> >    PipelineRunner
>
> >    should keep track of the initially generated splits.
>
> >    3. Support reading of Kafka partitions that were added to topic/s
> while
>
> >    a Pipeline reads from them - BEAM-727
>
> >    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>
> >    4. Reading/persisting Kafka start offsets - since Spark works in
>
> >    micro-batches, if "latest" was applied on a fairly sparse topic each
> worker
>
> >    would actually begin reading only after it saw a message during the
> time
>
> >    window it had to read messages. This is because fetching the offsets
> is
>
> >    done by the worker running the Reader. This means that each Reader
> sees a
>
> >    different state of "latest" (for his partition/s), such that a failing
>
> >    Reader that hasn't read yet might fetch a different "latest" once it's
>
> >    recovered then what it originally fetched. While this may not be as
> painful
>
> >    for other runners, IMHO it lacks correctness and I'd suggest either
> reading
>
> >    Kafka metadata of the Kafka cluster once upon initial splitting, or
> add
>
> >    some of it to the CheckpointMark. Filed BEAM-704
>
> >    <https://issues.apache.org/jira/browse/BEAM-704>.
>
> >
>
> > The original thread is called "Should UnboundedSource provide a split
>
> > identifier ?".
>
> >
>
> > While the only specific implementation of UnboundedSource discussed here
> is
>
> > Kafka, it is probably the most popular open-source UnboundedSource.
> Having
>
> > said that, I wonder where this meets PubSub ? or any other
> UnboundedSource
>
> > that those questions might affect.
>
> >
>
> > Thanks,
>
> > Amit
>
> >
>
>
>
> --
>
> Jean-Baptiste Onofré
>
> jbonofre@apache.org
>
> http://blog.nanthrax.net
>
> Talend - http://www.talend.com
>
>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Amit,

For 1., the runner is responsible of the checkpoint storage (associated 
with the source). It's the way for the runner to retry and know the 
failed bundles.

For 4, are you proposing that KafkaRecord store additional metadata for 
that ? It sounds like what I proposed in the "Technical Vision" appendix 
document: there I proposed to introduce a DataXchange object that store 
some additional metadata (like offset) used by the runner. It would be 
the same with SDF as the tracker state should be persistent as well.

Regards
JB

On 10/08/2016 01:55 AM, Amit Sela wrote:
> I started a thread about (suggesting) UnboundedSource splitId's and it
> turned into an UnboundedSource/KafkaIO discussion, and I think it's best to
> start over in a clear [DISCUSS] thread.
>
> When working on UnboundedSource support for the Spark runner, I've raised
> some questions, some were general-UnboundedSource, and others
> Kafka-specific.
>
> I'd like to recap them here, and maybe have a more productive and
> well-documented discussion for everyone.
>
>    1. UnboundedSource id's - I assume any runner persists the
>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how it
>    matches the appropriate split (of the UnboundedSource) to it's previously
>    persisted CheckpointMark in any specific worker ?
>    *Thomas Groh* mentioned that Source splits have to have an
> associated identifier,
>    and so the runner gets to tag splits however it pleases, so long as
>    those tags don't allow splits to bleed into each other.
>    2. Consistent splitting - an UnboundedSource splitting seems to require
>    consistent splitting if it were to "pick-up where it left", correct ? this
>    is not mentioned as a requirement or a recommendation in
>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue ?
>    *Raghu Angadi* mentioned that Kafka already does so by applying
>    partitions to readers in a round-robin manner.
>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>    require deterministic splitting (although it's recommended), a
>    PipelineRunner
>    should keep track of the initially generated splits.
>    3. Support reading of Kafka partitions that were added to topic/s while
>    a Pipeline reads from them - BEAM-727
>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>    4. Reading/persisting Kafka start offsets - since Spark works in
>    micro-batches, if "latest" was applied on a fairly sparse topic each worker
>    would actually begin reading only after it saw a message during the time
>    window it had to read messages. This is because fetching the offsets is
>    done by the worker running the Reader. This means that each Reader sees a
>    different state of "latest" (for his partition/s), such that a failing
>    Reader that hasn't read yet might fetch a different "latest" once it's
>    recovered then what it originally fetched. While this may not be as painful
>    for other runners, IMHO it lacks correctness and I'd suggest either reading
>    Kafka metadata of the Kafka cluster once upon initial splitting, or add
>    some of it to the CheckpointMark. Filed BEAM-704
>    <https://issues.apache.org/jira/browse/BEAM-704>.
>
> The original thread is called "Should UnboundedSource provide a split
> identifier ?".
>
> While the only specific implementation of UnboundedSource discussed here is
> Kafka, it is probably the most popular open-source UnboundedSource. Having
> said that, I wonder where this meets PubSub ? or any other UnboundedSource
> that those questions might affect.
>
> Thanks,
> Amit
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Amit Sela <am...@gmail.com>.
Thanks Raghu!

WIP PR for UnboundedSource support:
https://github.com/amitsela/incubator-beam/commits/BEAM-658-WIP
JIRA ticket: https://issues.apache.org/jira/browse/BEAM-658
Design document:
https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing

The PR still needs some polishing, and better testing, I'll be working on
it this week.

Thanks,
Amit

On Sun, Oct 9, 2016 at 2:54 AM Raghu Angadi <ra...@google.com.invalid>
wrote:

> Thanks for the updates Amit.
>
> One follow up: when does the Spark runner ask for CheckpointMark from the
> reader (at the end of a micro batch?)?
>
I've wrapped the entire Read within Spark's stateful operator
(mapWithState), so at the end of each read the CheckpointMark is saved.

>
> Spark runner seems different enough from Dataflow and Flink that it is a
> good show case for Beam runner API and implementations. It is not surprise
> that you are seeing some issues, I think we can work these out.
>
> I am interested in updating KafkaIO to support change in Kafka partitions
> at run time. Looks like it would work for Spark too (as deriredNumSplits
> stays constant).
>
> You might not need to use 'BoundedReadFromUnboundedSource', but that could
> be a future improvement (e.g. Dataflow runner has full control of life
> cycle of a source/reader.
>
I'm using a slightly different version of `BoundedReadFromUnboundedSource`,
and I think I have no choice because I have to bound the read somehow or
the micro-batch will grow indefinitely.

>
> I will take a look at the Spark runner.. do you have a branch that you are
> working with?
>
> Raghu.
>
>
> On Sat, Oct 8, 2016 at 11:33 AM, Amit Sela <am...@gmail.com> wrote:
>
> > Some answers inline.
> > @Raghu I'll review the PR tomorrow.
> >
> > Thanks,
> > Amit
> >
> > On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <ra...@google.com.invalid>
> > wrote:
> >
> > > On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <am...@gmail.com>
> wrote:
> > >
> > > >    3. Support reading of Kafka partitions that were added to topic/s
> > > while
> > > >    a Pipeline reads from them - BEAM-727
> > > >    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
> > > >
> > >
> > > I think this is doable (assuming some caveats about
> > generateInitalSplits()
> > > contract). How important is this feature?
> > >
> > Well, I don't know how to estimate the importance but I can say that
> Kafka
> > allows this, and Spark Streaming supports this - the way Apache Spark
> > currently reads from Kafka is by polling start-end offsets before
> > scheduling every micro-batch job, so if suddenly a new partition appears
> > it's simply added to the list of partitions to read from and the driver
> > program tracks the offsets.
> > I assume Beam would like to support added partitions eventually and from
> > what I understand Dataflow will have to restart the pipeline (Flink too
> ?),
> > but it doesn't change the fact that we'll have to assign in a way that
> > keeps previously assigned partitions the same.
> >
> > >
> > > Some basic questions about Spark runner :
> > >
> > >    - do number of partitions stay same during life of a long running
> Beam
> > >    streaming job?
> > >
> > They should, yes, otherwise we'll have a consistency issue with partition
> > assignment.
> >
> > >    - Will generateIntialSplits() be called more than once during the
> life
> > >    of a job?
> > >
> > This is an important point! yes, every "batch interval" Spark will
> execute
> > a new "micro" job and (the runner) will treat UnboundedSource as a sort
> of
> > BoundedReadFromUnboundedSource with bounds on time (proportional to the
> > interval time) and possibly records, so generateIntialSplits() will be
> > re-evaluated each time. That's way the runner should keep the original
> > (first) splitting, by supplying UnboundedSource with the same
> > *desiredNumSplits
> > *as negotiated at first (against the Spark backend - YARN/Mesos etc.).
> >
> > >    - When a job is restarted, is generateInitialSplits() invoked again?
> > >       - if yes, do you expect  'desiredNumSplits' for
> > >       generateInitialSplits() to stay same as previous run?
> > >
> > As mentioned above, the execution itself is a constant
> > "restart/resubmission" of jobs, and upon a "real" restart (due to failure
> > recovery of user-initiated stop-start) the runner is supposed do preserve
> > the original *desiredNumSplits, *and the UnboundedSource is expected to
> > consistently re-assign the splits.
> >
> > >       - if no, are the readers instantiated from previous runs?
> > >
> >
>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Raghu Angadi <ra...@google.com.INVALID>.
Thanks for the updates Amit.

One follow up: when does the Spark runner ask for CheckpointMark from the
reader (at the end of a micro batch?)?

Spark runner seems different enough from Dataflow and Flink that it is a
good show case for Beam runner API and implementations. It is not surprise
that you are seeing some issues, I think we can work these out.

I am interested in updating KafkaIO to support change in Kafka partitions
at run time. Looks like it would work for Spark too (as deriredNumSplits
stays constant).

You might not need to use 'BoundedReadFromUnboundedSource', but that could
be a future improvement (e.g. Dataflow runner has full control of life
cycle of a source/reader.

I will take a look at the Spark runner.. do you have a branch that you are
working with?

Raghu.


On Sat, Oct 8, 2016 at 11:33 AM, Amit Sela <am...@gmail.com> wrote:

> Some answers inline.
> @Raghu I'll review the PR tomorrow.
>
> Thanks,
> Amit
>
> On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <ra...@google.com.invalid>
> wrote:
>
> > On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <am...@gmail.com> wrote:
> >
> > >    3. Support reading of Kafka partitions that were added to topic/s
> > while
> > >    a Pipeline reads from them - BEAM-727
> > >    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
> > >
> >
> > I think this is doable (assuming some caveats about
> generateInitalSplits()
> > contract). How important is this feature?
> >
> Well, I don't know how to estimate the importance but I can say that Kafka
> allows this, and Spark Streaming supports this - the way Apache Spark
> currently reads from Kafka is by polling start-end offsets before
> scheduling every micro-batch job, so if suddenly a new partition appears
> it's simply added to the list of partitions to read from and the driver
> program tracks the offsets.
> I assume Beam would like to support added partitions eventually and from
> what I understand Dataflow will have to restart the pipeline (Flink too ?),
> but it doesn't change the fact that we'll have to assign in a way that
> keeps previously assigned partitions the same.
>
> >
> > Some basic questions about Spark runner :
> >
> >    - do number of partitions stay same during life of a long running Beam
> >    streaming job?
> >
> They should, yes, otherwise we'll have a consistency issue with partition
> assignment.
>
> >    - Will generateIntialSplits() be called more than once during the life
> >    of a job?
> >
> This is an important point! yes, every "batch interval" Spark will execute
> a new "micro" job and (the runner) will treat UnboundedSource as a sort of
> BoundedReadFromUnboundedSource with bounds on time (proportional to the
> interval time) and possibly records, so generateIntialSplits() will be
> re-evaluated each time. That's way the runner should keep the original
> (first) splitting, by supplying UnboundedSource with the same
> *desiredNumSplits
> *as negotiated at first (against the Spark backend - YARN/Mesos etc.).
>
> >    - When a job is restarted, is generateInitialSplits() invoked again?
> >       - if yes, do you expect  'desiredNumSplits' for
> >       generateInitialSplits() to stay same as previous run?
> >
> As mentioned above, the execution itself is a constant
> "restart/resubmission" of jobs, and upon a "real" restart (due to failure
> recovery of user-initiated stop-start) the runner is supposed do preserve
> the original *desiredNumSplits, *and the UnboundedSource is expected to
> consistently re-assign the splits.
>
> >       - if no, are the readers instantiated from previous runs?
> >
>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Amit Sela <am...@gmail.com>.
Some answers inline.
@Raghu I'll review the PR tomorrow.

Thanks,
Amit

On Sat, Oct 8, 2016 at 3:47 AM Raghu Angadi <ra...@google.com.invalid>
wrote:

> On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <am...@gmail.com> wrote:
>
> >    3. Support reading of Kafka partitions that were added to topic/s
> while
> >    a Pipeline reads from them - BEAM-727
> >    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
> >
>
> I think this is doable (assuming some caveats about generateInitalSplits()
> contract). How important is this feature?
>
Well, I don't know how to estimate the importance but I can say that Kafka
allows this, and Spark Streaming supports this - the way Apache Spark
currently reads from Kafka is by polling start-end offsets before
scheduling every micro-batch job, so if suddenly a new partition appears
it's simply added to the list of partitions to read from and the driver
program tracks the offsets.
I assume Beam would like to support added partitions eventually and from
what I understand Dataflow will have to restart the pipeline (Flink too ?),
but it doesn't change the fact that we'll have to assign in a way that
keeps previously assigned partitions the same.

>
> Some basic questions about Spark runner :
>
>    - do number of partitions stay same during life of a long running Beam
>    streaming job?
>
They should, yes, otherwise we'll have a consistency issue with partition
assignment.

>    - Will generateIntialSplits() be called more than once during the life
>    of a job?
>
This is an important point! yes, every "batch interval" Spark will execute
a new "micro" job and (the runner) will treat UnboundedSource as a sort of
BoundedReadFromUnboundedSource with bounds on time (proportional to the
interval time) and possibly records, so generateIntialSplits() will be
re-evaluated each time. That's way the runner should keep the original
(first) splitting, by supplying UnboundedSource with the same *desiredNumSplits
*as negotiated at first (against the Spark backend - YARN/Mesos etc.).

>    - When a job is restarted, is generateInitialSplits() invoked again?
>       - if yes, do you expect  'desiredNumSplits' for
>       generateInitialSplits() to stay same as previous run?
>
As mentioned above, the execution itself is a constant
"restart/resubmission" of jobs, and upon a "real" restart (due to failure
recovery of user-initiated stop-start) the runner is supposed do preserve
the original *desiredNumSplits, *and the UnboundedSource is expected to
consistently re-assign the splits.

>       - if no, are the readers instantiated from previous runs?
>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Raghu Angadi <ra...@google.com.INVALID>.
On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <am...@gmail.com> wrote:

>    3. Support reading of Kafka partitions that were added to topic/s while
>    a Pipeline reads from them - BEAM-727
>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>

I think this is doable (assuming some caveats about generateInitalSplits()
contract). How important is this feature?

Some basic questions about Spark runner :

   - do number of partitions stay same during life of a long running Beam
   streaming job?
   - Will generateIntialSplits() be called more than once during the life
   of a job?
   - When a job is restarted, is generateInitialSplits() invoked again?
      - if yes, do you expect  'desiredNumSplits' for
      generateInitialSplits() to stay same as previous run?
      - if no, are the readers instantiated from previous runs?

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Raghu Angadi <ra...@google.com.INVALID>.
Regd BEAM-704 : sent a PR https://github.com/apache/incubator-beam/pull/1071
for setting 'consumedOffset' in the reader without waiting to read the
first record.

On Fri, Oct 7, 2016 at 5:28 PM, Dan Halperin <dh...@google.com.invalid>
wrote:

> >    4. Reading/persisting Kafka start offsets - since Spark works in
> >    micro-batches, if "latest" was applied on a fairly sparse topic each
> > worker
> >    would actually begin reading only after it saw a message during the
> time
> >    window it had to read messages. This is because fetching the offsets
> is
> >    done by the worker running the Reader. This means that each Reader
> sees
> > a
> >    different state of "latest" (for his partition/s), such that a failing
> >    Reader that hasn't read yet might fetch a different "latest" once it's
> >    recovered then what it originally fetched. While this may not be as
> > painful
> >    for other runners, IMHO it lacks correctness and I'd suggest either
> > reading
> >    Kafka metadata of the Kafka cluster once upon initial splitting, or
> add
> >    some of it to the CheckpointMark. Filed BEAM-704
> >    <https://issues.apache.org/jira/browse/BEAM-704>.
> >
>
> +1. This is a great point. The notion that a runner may stop a reader and
> resume it from a checkpoint frequently is definitely part of the Beam model
> -- right now Spark and Direct runners (at least) do it very often. The
> current behavior is definitely, if not broken... unexpected.
>
> Both proposed solutions make sense to me -- either log the last offset for
> all partitions during splitting, or simply log the previous offset in the
> checkpoint mark when we start reading for the first time.
>
>

Re: [DISCUSS] UnboundedSource and the KafkaIO.

Posted by Dan Halperin <dh...@google.com.INVALID>.
Thanks Amit!

A little bit inline.

On Fri, Oct 7, 2016 at 4:55 PM, Amit Sela <am...@gmail.com> wrote:

> I started a thread about (suggesting) UnboundedSource splitId's and it
> turned into an UnboundedSource/KafkaIO discussion, and I think it's best to
> start over in a clear [DISCUSS] thread.
>
> When working on UnboundedSource support for the Spark runner, I've raised
> some questions, some were general-UnboundedSource, and others
> Kafka-specific.
>
> I'd like to recap them here, and maybe have a more productive and
> well-documented discussion for everyone.
>
>    1. UnboundedSource id's - I assume any runner persists the
>    UnboundedSources's CheckpointMark for fault-tolerance, but I wonder how
> it
>    matches the appropriate split (of the UnboundedSource) to it's
> previously
>    persisted CheckpointMark in any specific worker ?
>    *Thomas Groh* mentioned that Source splits have to have an
> associated identifier,
>    and so the runner gets to tag splits however it pleases, so long as
>    those tags don't allow splits to bleed into each other.
>

   2. Consistent splitting - an UnboundedSource splitting seems to require
>    consistent splitting if it were to "pick-up where it left", correct ?
> this
>    is not mentioned as a requirement or a recommendation in
>    UnboundedSource#generateInitialSplits(), so is this a Kafka-only issue
> ?
>    *Raghu Angadi* mentioned that Kafka already does so by applying
>    partitions to readers in a round-robin manner.
>    *Thomas Groh* also added that while the UnboundedSource API doesn't
>    require deterministic splitting (although it's recommended), a
>    PipelineRunner
>    should keep track of the initially generated splits.
>

The usual approach (I think this answers both 1 & 2) is that a runner
persists the actual original source (which is required to be serializable)
along with the checkpoints (which are required to be Codeable). This way
when the runner wants to resume from a checkpoint, it can use that same
source -- and thus splitting need not be deterministic.

It sounded like "pick up where it left off" is reminiscent of Dataflow's
pipeline update. That indeed requires
UnboundedSource#generateInitialSplits() to be deterministic, if sources
have configuration information, because we may try to link up new sources
with old checkpoints and we need to do so correctly. But of course this is
not super-awesome, because it's hard to make deterministic functions.
Especially if, e.g., the Kafka partitioning has changed in the meantime.


>    3. Support reading of Kafka partitions that were added to topic/s while
>    a Pipeline reads from them - BEAM-727
>    <https://issues.apache.org/jira/browse/BEAM-727> was filed.
>

See above :). Tough, but useful.


>    4. Reading/persisting Kafka start offsets - since Spark works in
>    micro-batches, if "latest" was applied on a fairly sparse topic each
> worker
>    would actually begin reading only after it saw a message during the time
>    window it had to read messages. This is because fetching the offsets is
>    done by the worker running the Reader. This means that each Reader sees
> a
>    different state of "latest" (for his partition/s), such that a failing
>    Reader that hasn't read yet might fetch a different "latest" once it's
>    recovered then what it originally fetched. While this may not be as
> painful
>    for other runners, IMHO it lacks correctness and I'd suggest either
> reading
>    Kafka metadata of the Kafka cluster once upon initial splitting, or add
>    some of it to the CheckpointMark. Filed BEAM-704
>    <https://issues.apache.org/jira/browse/BEAM-704>.
>

+1. This is a great point. The notion that a runner may stop a reader and
resume it from a checkpoint frequently is definitely part of the Beam model
-- right now Spark and Direct runners (at least) do it very often. The
current behavior is definitely, if not broken... unexpected.

Both proposed solutions make sense to me -- either log the last offset for
all partitions during splitting, or simply log the previous offset in the
checkpoint mark when we start reading for the first time.


>
> The original thread is called "Should UnboundedSource provide a split
> identifier ?".
>
> While the only specific implementation of UnboundedSource discussed here is
> Kafka, it is probably the most popular open-source UnboundedSource. Having
> said that, I wonder where this meets PubSub ? or any other UnboundedSource
> that those questions might affect.
>

FWIW, Google Cloud Pub/Sub sources do not have state. Rather than connect
to specific partitions, they just say "hey give me some messages". So the
splitting is easy and deterministic for free. [link
<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java#L1043>
]


>
> Thanks,
> Amit
>