You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by xinyu liu <xi...@gmail.com> on 2017/05/24 18:30:14 UTC

[DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Hi all,

I created SEP-6 for SAMZA-1260
<https://issues.apache.org/jira/browse/SAMZA-1260>: Support Watermark
Across Intermediate Streams for Batch Processing. The link to the SEP is
here:

https://cwiki.apache.org/confluence/display/SAMZA/SEP-6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing

Please review and comments are welcome!

Thanks,
Xinyu

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by xinyu liu <xi...@gmail.com>.
Chatted with Yi offline and we both agree on keeping watermarks and
end-of-stream separate. @Chris: thanks for the detailed explanation.

@Yi: for you questions:

1) The propagation and the reconciliation process of watermarks and
end-of-stream are the same. As we discussed, this can be done inside the
consumer TaskInstance. After reconciliation, a single eos/watermark message
will be emitted to the task. The message emission will be based on the
control message type. For watermark we will calculate the min watermark
timestamp. For end-of-stream it's a empty envelop with END-OF-STREAM
offset, as we have today.

2) We need to checkpoint the control message bookkeeping states in the
TaskInstance along with the incoming message offsets. During failure
recovery, we will restore the states and continue processing future control
messages. reemission from upstream watermarks will not cause issues since
the state keeps the latest watermarks received. Any older watermarks will
be thrown away.

3) Yes, the integration with future exactly-once processing is out-of-scope
for this SEP. I will sync with Becket to make sure the exactly-once marker
can use the mechanism I am building here (in-band control messages) for
propagation.

I updated the SEP-6 (change the name to be more general. new link:
https://cwiki.apache.org/confluence/display/SAMZA/SEP-6+Support+Control+Message+Across+Intermediate+Streams)
according to our discussion. Please take a look.

Thanks,
Xinyu



On Thu, Jun 1, 2017 at 10:41 AM, Chris Pettitt <
cpettitt@linkedin.com.invalid> wrote:

> I would recommend keeping watermarks and end-of-stream separate. It is
> lossy to represent end-of-stream as a watermark - does that mean we
> hit the max watermark on a stream or that we're in a bounded stream
> with an end-of-stream marker? Also keep in mind that watermarks will
> eventually be user overridable and it would be possible for a user to
> effectively emit an end-of-stream control message on an unbounded
> stream.
>
> On Wed, May 31, 2017 at 8:12 PM, Yi Pan <ni...@gmail.com> wrote:
> > Hi, Xinyu,
> >
> > Thanks for the update. So I have two suggestions:
> > - It seems to me that EndOfStream can be implemented as a special type of
> > Watermark as well. a) we can use MAX_INT in the watermark value to
> indicate
> > the end-of-stream; b) the streamId are simply the key to the Map<String,
> > Long> in the source ingestion task. When the source ingestion task
> received
> > enough count of EoS, it simply emits an EoS with its own taskName to the
> > intermediate stream as a watermark and the watermark propagation rule
> will
> > work. The only different thing the tasks will do in EoS is shutdown the
> > current tasks, while non EoS watermarks does not trigger shutdown.
> However,
> > that will allow us to simplify the type of messages and data structure to
> > pass through. And the reasoning in reconsilation in the downstream tasks
> > are pretty simple: a) # of watermarks == # of upstream tasks (i.e.
> > producers) b) propagation rule for the watermark message is the same;
> > - Based on our discussion yesterday, I think that we also need a detailed
> > description in the design to talk about the failure recovery scenario,
> > especially to answer the questions: a) in failure recovery, how the
> > checkpoint of offsets in the input streams and the watermark checkpoint
> > recovered in the current task? b) What's the correlation between the
> input
> > offsets and watermarks in the checkpoint in the current task?; c) what's
> > the implication of re-emitted watermarks from the current task to the
> > downstream tasks?
> >
> > And for Beam's watermark algorithm that Chris pointed out, I think that
> > OldestWork(stage) would be corresponding to the watermark timestamp of
> some
> > messages/state that we buffer in the current task and have not generate
> the
> > output or commit the state change yet. That would be needed if we
> implement
> > the exact-once algorithm Backett is working on since the algorithm will
> > require buffering message/state that are not committed yet. For now, if
> we
> > are processing each incoming message immediately and only checkpoint the
> > messages being processed completely, I think that we can ignore it.
> >
> > Just my few points.
> >
> > Thanks!
> >
> > -Yi
> >
> > On Tue, May 30, 2017 at 5:47 PM, xinyu liu <xi...@gmail.com>
> wrote:
> >
> >> @Chris: thanks a lot for providing the definitions. The first equation
> is
> >> exactly what I want to say about the watermark reconciliation. I haven't
> >> got to the second equation yet. Will probably think it through once I
> get
> >> there.
> >>
> >> @Yi: I updated the SEP-6 based on your feedback. Some replies to your
> >> questions are below:
> >>
> >> >> the proposal is for all types of control messages, not just for
> >> end-of-stream, right? Better to define the scope and layout the comment
> >> requirements of control message delivery.
> >>
> >> Right, the proposal is to support the general control messages. I added
> >> more content in the problem description about watermarks and also listed
> >> supporting watermark propagation as once of the goals.
> >>
> >> >> in step-3, how does the consumer of intermediate streams know how
> many
> >> EOS messages should be received? And we should make it clear that it
> should
> >> be EOS / producer and the count of the downstream consumer is counting
> on
> >> the number of unique EOS from all producers from the upstream.
> >>
> >> The control message itself currently contains the count of upstream
> >> producers (tasks). Chris suggested earlier if each processor has a
> global
> >> view of the each job models, then we can remove the count field. Right
> now
> >> we are using this field to keep track of the total count. I also updated
> >> the description of this part.
> >>
> >> >> In comparison table, “checkpoint the control messages received” ==>
> is
> >> it
> >> referring to the partially accumulated upstream EOS messages?
> >>
> >> Yes, that's correct. We will check point all the upstream tasks that has
> >> reached end-of-stream for a streamId.
> >>
> >> >> Please make a clear definition on “Watermark” and “EndOfStream”. Why
> are
> >> they different? Are they both control messages that requires the same
> >> delivery pattern (i.e. broadcast to downstream, reconcile at the
> consumer)?
> >> If yes, should we make the “watermark” vs “EndOfStream” a sub-category
> in
> >> control message?
> >> They are different: watermark contains a timestamp from the producer
> task,
> >> while EndOfStream message indicates the producer task has completely
> >> processed a stream. They both are control messages which require same
> >> delivery pattern. I updated the SEP to make it clearer they are
> >> sub-category of control message.
> >>
> >> >> As for the serde for intermediate stream, I assume that we will need
> an
> >> envelope serde that is avro to wrap the user message and control message
> >> in? So, user-defined serde now only applies to the “UserMessage”? And
> >> what’s the message key in the message format?
> >> The serde wrapper for the message is customized: the first byte
> indicates
> >> the message type, and the following byte array is the actual message.
> For
> >> user message, we will apply user provided serde. For control message, we
> >> will use JSON. The key is the same. We do not need customized serde
> since
> >> we can infer the serde from message.
> >>
> >> >> A big question regarding to the watermark propagation: “When Samza
> >> receives watermark messages, it will emit a watermark with the earliest
> >> event time across all the stream partitions. No emission if the earliest
> >> event time doesn’t change.” Does the watermark propagation requires
> >> synchronization/coordination between all producers at the source? Say,
> if
> >> the task taking one input source emits watermark at 1min interval and
> the
> >> task taking another input source emits watermark at 5min interval, how
> does
> >> the downstream consumer reconcile the watermarks?
> >>
> >> Watermark propagation does not require synchronization. Chris's
> equations
> >> are very accurate about how the calculations work. Please take a look.
> >>
> >> >> In the checkpoint message format, it seems that it is only design for
> >> watermark messages? Any streamId info that EoS is carrying over?
> >>
> >> Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for
> it.
> >> Now the EOS checkpoint has the streamId.
> >>
> >> Thanks,
> >> Xinyu
> >>
> >> On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
> >> cpettitt@linkedin.com.invalid> wrote:
> >>
> >> > FWIW, there is a Beam presentation that has a very crisp set of rules
> >> > around watermarks. From memory it boils down to something like:
> >> >
> >> > InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
> >> > Upstream(stage) }
> >> > OutputWatermark(stage) = min { InputWatermark(stage),
> OldestWork(stage) }
> >> >
> >> > OldestWork(stage) is the oldest message that has been received by the
> >> stage
> >> > but not yet processed.
> >> >
> >> > - Chris
> >> >
> >> > On Tue, May 30, 2017 at 1:39 PM, Yi Pan <ni...@gmail.com> wrote:
> >> >
> >> > > Hi, Xinyu,
> >> > >
> >> > > Thanks for the proposal. I took a quick pass and had the following
> >> > > questions/comments:
> >> > >
> >> > > - message shuffling ==> data shuffling???
> >> > >
> >> > > - the proposal is for all types of control messages, not just for
> >> > > end-of-stream, right? Better to define the scope and layout the
> comment
> >> > > requirements of control message delivery.
> >> > >
> >> > > - dropped option should go to “Rejected alternatives”
> >> > >
> >> > > - “Samza finds out the following intermediate streams that all the
> >> inputs
> >> > > have been end-of-stream” what does it mean? The task consuming the
> >> input
> >> > > stream(s) reconcile all EoS from all input stream partitions and
> then
> >> > > propagate EoS messages to all partitions in intermediate streams?
> This
> >> is
> >> > > not super clear to me.
> >> > >
> >> > > - in step-3, how does the consumer of intermediate streams know how
> >> many
> >> > > EOS messages should be received? And we should make it clear that it
> >> > should
> >> > > be EOS / producer and the count of the downstream consumer is
> counting
> >> on
> >> > > the number of unique EOS from all producers from the upstream.
> >> > >
> >> > > - In comparison table, “checkpoint the control messages received”
> ==>
> >> is
> >> > it
> >> > > referring to the partially accumulated upstream EOS messages?
> >> > >
> >> > > - Please make a clear definition on “Watermark” and “EndOfStream”.
> Why
> >> > are
> >> > > they different? Are they both control messages that requires the
> same
> >> > > delivery pattern (i.e. broadcast to downstream, reconcile at the
> >> > consumer)?
> >> > > If yes, should we make the “watermark” vs “EndOfStream” a
> sub-category
> >> in
> >> > > control message?
> >> > >
> >> > > - As for the serde for intermediate stream, I assume that we will
> need
> >> an
> >> > > envelope serde that is avro to wrap the user message and control
> >> message
> >> > > in? So, user-defined serde now only applies to the “UserMessage”?
> And
> >> > > what’s the message key in the message format?
> >> > >
> >> > > - A big question regarding to the watermark propagation: “When Samza
> >> > > receives watermark messages, it will emit a watermark with the
> earliest
> >> > > event time across all the stream partitions. No emission if the
> >> earliest
> >> > > event time doesn’t change.” Does the watermark propagation requires
> >> > > synchronization/coordination between all producers at the source?
> Say,
> >> if
> >> > > the task taking one input source emits watermark at 1min interval
> and
> >> the
> >> > > task taking another input source emits watermark at 5min interval,
> how
> >> > does
> >> > > the downstream consumer reconcile the watermarks?
> >> > >
> >> > > - In the checkpoint message format, it seems that it is only design
> for
> >> > > watermark messages? Any streamId info that EoS is carrying over?
> >> > >
> >> > >
> >> > > Thanks a lot!
> >> > >
> >> > >
> >> > > -Yi
> >> > >
> >> > > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xi...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Makes sense. I noticed that too and I dropped the ControlMessage
> type
> >> > in
> >> > > my
> >> > > > code. I also moved taskName, taskCount to the parent
> ControlMessage
> >> > > class.
> >> > > > Just updated the SEP-6. Please take a look again.
> >> > > >
> >> > > > Thanks,
> >> > > > Xinyu
> >> > > >
> >> > > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> >> > > > cpettitt@linkedin.com.invalid> wrote:
> >> > > >
> >> > > > > MessageType and ControlMessage.Type look redundant. You could
> >> either
> >> > > use
> >> > > > > "ControlMessage" as the type in MessageType or drop
> >> > > ControlMessage.Type.
> >> > > > >
> >> > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <
> xinyuliu.us@gmail.com>
> >> > > > wrote:
> >> > > > >
> >> > > > > > Thanks a lot for the comments. I updated the SEP with more
> >> details
> >> > > and
> >> > > > > > clarification. Please let me know if you have further
> questions.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Xinyu
> >> > > > > >
> >> > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> >> > > > > > pmaheshwari@linkedin.com.invalid> wrote:
> >> > > > > >
> >> > > > > > > Hi Xinyu,
> >> > > > > > >
> >> > > > > > > Thanks for the proposal. Some requests for clarifications.
> >> Let's
> >> > > > update
> >> > > > > > the
> >> > > > > > > SEP directly instead of replying here.
> >> > > > > > >
> >> > > > > > > E.g., in "For any following intermediate stream whose input
> >> > streams
> >> > > > are
> >> > > > > > all
> >> > > > > > > end-of-stream, it will be marked as pending EOS" - Should
> >> clarify
> >> > > > that
> >> > > > > > > (IIUC) something is injecting EOS messages in all
> intermediate
> >> > > stream
> >> > > > > > > partitions once it receives EOS from all input stream
> >> partitions
> >> > > it's
> >> > > > > > > consuming. Should also clarify what is that something.
> >> > > > > > > Same for "declare end of stream once all the EOS messages
> have
> >> > been
> >> > > > > > > received." - What does this declaration involve and who is
> >> doing
> >> > > > this?
> >> > > > > > >
> >> > > > > > > In pro for approach 2: Not clear what this means - "The
> >> watermark
> >> > > can
> >> > > > > > > conclude the input messages before this watermark have been
> >> > > > complete."
> >> > > > > > >
> >> > > > > > > For the cons of approach 2: "Complicated failure scenario of
> >> the
> >> > > > second
> >> > > > > > > job. It needs to checkpoint all the watermark messages
> >> received,
> >> > so
> >> > > > > when
> >> > > > > > it
> >> > > > > > > recovered from failure, it can still count." - How is this
> >> > related
> >> > > to
> >> > > > > > EOS?
> >> > > > > > > How is this related to the checkpoint watermark section?
> >> > > > > > > Also, what is the "more messages required to write.. "
> >> referring
> >> > > to?
> >> > > > > > >
> >> > > > > > > "Samza needs to reconcile based on the task counts." -
> Please
> >> > > explain
> >> > > > > > what
> >> > > > > > > reconciliation means, why it needs to happen, and why we
> need
> >> to
> >> > > > track
> >> > > > > > the
> >> > > > > > > producer task and total task count in the watermark message
> to
> >> do
> >> > > > this.
> >> > > > > > >
> >> > > > > > > Checkpoint watermarks section is also unclear. What problem
> are
> >> > we
> >> > > > > trying
> >> > > > > > > to solve here?
> >> > > > > > >
> >> > > > > > > Should also move the message format and the watermark
> message
> >> > > > interface
> >> > > > > > > sections to the bottom, since they depend on details in the
> >> event
> >> > > > time
> >> > > > > > and
> >> > > > > > > checkpoint watermark sections.
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Prateek
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <
> >> > xinyuliu.us@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi all,
> >> > > > > > > >
> >> > > > > > > > I created SEP-6 for SAMZA-1260
> >> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>:
> Support
> >> > > > > Watermark
> >> > > > > > > > Across Intermediate Streams for Batch Processing. The
> link to
> >> > the
> >> > > > SEP
> >> > > > > > is
> >> > > > > > > > here:
> >> > > > > > > >
> >> > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> >> > > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
> >> > > > Batch+Processing
> >> > > > > > > >
> >> > > > > > > > Please review and comments are welcome!
> >> > > > > > > >
> >> > > > > > > > Thanks,
> >> > > > > > > > Xinyu
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by Chris Pettitt <cp...@linkedin.com.INVALID>.
I would recommend keeping watermarks and end-of-stream separate. It is
lossy to represent end-of-stream as a watermark - does that mean we
hit the max watermark on a stream or that we're in a bounded stream
with an end-of-stream marker? Also keep in mind that watermarks will
eventually be user overridable and it would be possible for a user to
effectively emit an end-of-stream control message on an unbounded
stream.

On Wed, May 31, 2017 at 8:12 PM, Yi Pan <ni...@gmail.com> wrote:
> Hi, Xinyu,
>
> Thanks for the update. So I have two suggestions:
> - It seems to me that EndOfStream can be implemented as a special type of
> Watermark as well. a) we can use MAX_INT in the watermark value to indicate
> the end-of-stream; b) the streamId are simply the key to the Map<String,
> Long> in the source ingestion task. When the source ingestion task received
> enough count of EoS, it simply emits an EoS with its own taskName to the
> intermediate stream as a watermark and the watermark propagation rule will
> work. The only different thing the tasks will do in EoS is shutdown the
> current tasks, while non EoS watermarks does not trigger shutdown. However,
> that will allow us to simplify the type of messages and data structure to
> pass through. And the reasoning in reconsilation in the downstream tasks
> are pretty simple: a) # of watermarks == # of upstream tasks (i.e.
> producers) b) propagation rule for the watermark message is the same;
> - Based on our discussion yesterday, I think that we also need a detailed
> description in the design to talk about the failure recovery scenario,
> especially to answer the questions: a) in failure recovery, how the
> checkpoint of offsets in the input streams and the watermark checkpoint
> recovered in the current task? b) What's the correlation between the input
> offsets and watermarks in the checkpoint in the current task?; c) what's
> the implication of re-emitted watermarks from the current task to the
> downstream tasks?
>
> And for Beam's watermark algorithm that Chris pointed out, I think that
> OldestWork(stage) would be corresponding to the watermark timestamp of some
> messages/state that we buffer in the current task and have not generate the
> output or commit the state change yet. That would be needed if we implement
> the exact-once algorithm Backett is working on since the algorithm will
> require buffering message/state that are not committed yet. For now, if we
> are processing each incoming message immediately and only checkpoint the
> messages being processed completely, I think that we can ignore it.
>
> Just my few points.
>
> Thanks!
>
> -Yi
>
> On Tue, May 30, 2017 at 5:47 PM, xinyu liu <xi...@gmail.com> wrote:
>
>> @Chris: thanks a lot for providing the definitions. The first equation is
>> exactly what I want to say about the watermark reconciliation. I haven't
>> got to the second equation yet. Will probably think it through once I get
>> there.
>>
>> @Yi: I updated the SEP-6 based on your feedback. Some replies to your
>> questions are below:
>>
>> >> the proposal is for all types of control messages, not just for
>> end-of-stream, right? Better to define the scope and layout the comment
>> requirements of control message delivery.
>>
>> Right, the proposal is to support the general control messages. I added
>> more content in the problem description about watermarks and also listed
>> supporting watermark propagation as once of the goals.
>>
>> >> in step-3, how does the consumer of intermediate streams know how many
>> EOS messages should be received? And we should make it clear that it should
>> be EOS / producer and the count of the downstream consumer is counting on
>> the number of unique EOS from all producers from the upstream.
>>
>> The control message itself currently contains the count of upstream
>> producers (tasks). Chris suggested earlier if each processor has a global
>> view of the each job models, then we can remove the count field. Right now
>> we are using this field to keep track of the total count. I also updated
>> the description of this part.
>>
>> >> In comparison table, “checkpoint the control messages received” ==> is
>> it
>> referring to the partially accumulated upstream EOS messages?
>>
>> Yes, that's correct. We will check point all the upstream tasks that has
>> reached end-of-stream for a streamId.
>>
>> >> Please make a clear definition on “Watermark” and “EndOfStream”. Why are
>> they different? Are they both control messages that requires the same
>> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
>> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
>> control message?
>> They are different: watermark contains a timestamp from the producer task,
>> while EndOfStream message indicates the producer task has completely
>> processed a stream. They both are control messages which require same
>> delivery pattern. I updated the SEP to make it clearer they are
>> sub-category of control message.
>>
>> >> As for the serde for intermediate stream, I assume that we will need an
>> envelope serde that is avro to wrap the user message and control message
>> in? So, user-defined serde now only applies to the “UserMessage”? And
>> what’s the message key in the message format?
>> The serde wrapper for the message is customized: the first byte indicates
>> the message type, and the following byte array is the actual message. For
>> user message, we will apply user provided serde. For control message, we
>> will use JSON. The key is the same. We do not need customized serde since
>> we can infer the serde from message.
>>
>> >> A big question regarding to the watermark propagation: “When Samza
>> receives watermark messages, it will emit a watermark with the earliest
>> event time across all the stream partitions. No emission if the earliest
>> event time doesn’t change.” Does the watermark propagation requires
>> synchronization/coordination between all producers at the source? Say, if
>> the task taking one input source emits watermark at 1min interval and the
>> task taking another input source emits watermark at 5min interval, how does
>> the downstream consumer reconcile the watermarks?
>>
>> Watermark propagation does not require synchronization. Chris's equations
>> are very accurate about how the calculations work. Please take a look.
>>
>> >> In the checkpoint message format, it seems that it is only design for
>> watermark messages? Any streamId info that EoS is carrying over?
>>
>> Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it.
>> Now the EOS checkpoint has the streamId.
>>
>> Thanks,
>> Xinyu
>>
>> On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
>> cpettitt@linkedin.com.invalid> wrote:
>>
>> > FWIW, there is a Beam presentation that has a very crisp set of rules
>> > around watermarks. From memory it boils down to something like:
>> >
>> > InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
>> > Upstream(stage) }
>> > OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }
>> >
>> > OldestWork(stage) is the oldest message that has been received by the
>> stage
>> > but not yet processed.
>> >
>> > - Chris
>> >
>> > On Tue, May 30, 2017 at 1:39 PM, Yi Pan <ni...@gmail.com> wrote:
>> >
>> > > Hi, Xinyu,
>> > >
>> > > Thanks for the proposal. I took a quick pass and had the following
>> > > questions/comments:
>> > >
>> > > - message shuffling ==> data shuffling???
>> > >
>> > > - the proposal is for all types of control messages, not just for
>> > > end-of-stream, right? Better to define the scope and layout the comment
>> > > requirements of control message delivery.
>> > >
>> > > - dropped option should go to “Rejected alternatives”
>> > >
>> > > - “Samza finds out the following intermediate streams that all the
>> inputs
>> > > have been end-of-stream” what does it mean? The task consuming the
>> input
>> > > stream(s) reconcile all EoS from all input stream partitions and then
>> > > propagate EoS messages to all partitions in intermediate streams? This
>> is
>> > > not super clear to me.
>> > >
>> > > - in step-3, how does the consumer of intermediate streams know how
>> many
>> > > EOS messages should be received? And we should make it clear that it
>> > should
>> > > be EOS / producer and the count of the downstream consumer is counting
>> on
>> > > the number of unique EOS from all producers from the upstream.
>> > >
>> > > - In comparison table, “checkpoint the control messages received” ==>
>> is
>> > it
>> > > referring to the partially accumulated upstream EOS messages?
>> > >
>> > > - Please make a clear definition on “Watermark” and “EndOfStream”. Why
>> > are
>> > > they different? Are they both control messages that requires the same
>> > > delivery pattern (i.e. broadcast to downstream, reconcile at the
>> > consumer)?
>> > > If yes, should we make the “watermark” vs “EndOfStream” a sub-category
>> in
>> > > control message?
>> > >
>> > > - As for the serde for intermediate stream, I assume that we will need
>> an
>> > > envelope serde that is avro to wrap the user message and control
>> message
>> > > in? So, user-defined serde now only applies to the “UserMessage”? And
>> > > what’s the message key in the message format?
>> > >
>> > > - A big question regarding to the watermark propagation: “When Samza
>> > > receives watermark messages, it will emit a watermark with the earliest
>> > > event time across all the stream partitions. No emission if the
>> earliest
>> > > event time doesn’t change.” Does the watermark propagation requires
>> > > synchronization/coordination between all producers at the source? Say,
>> if
>> > > the task taking one input source emits watermark at 1min interval and
>> the
>> > > task taking another input source emits watermark at 5min interval, how
>> > does
>> > > the downstream consumer reconcile the watermarks?
>> > >
>> > > - In the checkpoint message format, it seems that it is only design for
>> > > watermark messages? Any streamId info that EoS is carrying over?
>> > >
>> > >
>> > > Thanks a lot!
>> > >
>> > >
>> > > -Yi
>> > >
>> > > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xi...@gmail.com>
>> > wrote:
>> > >
>> > > > Makes sense. I noticed that too and I dropped the ControlMessage type
>> > in
>> > > my
>> > > > code. I also moved taskName, taskCount to the parent ControlMessage
>> > > class.
>> > > > Just updated the SEP-6. Please take a look again.
>> > > >
>> > > > Thanks,
>> > > > Xinyu
>> > > >
>> > > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
>> > > > cpettitt@linkedin.com.invalid> wrote:
>> > > >
>> > > > > MessageType and ControlMessage.Type look redundant. You could
>> either
>> > > use
>> > > > > "ControlMessage" as the type in MessageType or drop
>> > > ControlMessage.Type.
>> > > > >
>> > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xi...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Thanks a lot for the comments. I updated the SEP with more
>> details
>> > > and
>> > > > > > clarification. Please let me know if you have further questions.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Xinyu
>> > > > > >
>> > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
>> > > > > > pmaheshwari@linkedin.com.invalid> wrote:
>> > > > > >
>> > > > > > > Hi Xinyu,
>> > > > > > >
>> > > > > > > Thanks for the proposal. Some requests for clarifications.
>> Let's
>> > > > update
>> > > > > > the
>> > > > > > > SEP directly instead of replying here.
>> > > > > > >
>> > > > > > > E.g., in "For any following intermediate stream whose input
>> > streams
>> > > > are
>> > > > > > all
>> > > > > > > end-of-stream, it will be marked as pending EOS" - Should
>> clarify
>> > > > that
>> > > > > > > (IIUC) something is injecting EOS messages in all intermediate
>> > > stream
>> > > > > > > partitions once it receives EOS from all input stream
>> partitions
>> > > it's
>> > > > > > > consuming. Should also clarify what is that something.
>> > > > > > > Same for "declare end of stream once all the EOS messages have
>> > been
>> > > > > > > received." - What does this declaration involve and who is
>> doing
>> > > > this?
>> > > > > > >
>> > > > > > > In pro for approach 2: Not clear what this means - "The
>> watermark
>> > > can
>> > > > > > > conclude the input messages before this watermark have been
>> > > > complete."
>> > > > > > >
>> > > > > > > For the cons of approach 2: "Complicated failure scenario of
>> the
>> > > > second
>> > > > > > > job. It needs to checkpoint all the watermark messages
>> received,
>> > so
>> > > > > when
>> > > > > > it
>> > > > > > > recovered from failure, it can still count." - How is this
>> > related
>> > > to
>> > > > > > EOS?
>> > > > > > > How is this related to the checkpoint watermark section?
>> > > > > > > Also, what is the "more messages required to write.. "
>> referring
>> > > to?
>> > > > > > >
>> > > > > > > "Samza needs to reconcile based on the task counts." - Please
>> > > explain
>> > > > > > what
>> > > > > > > reconciliation means, why it needs to happen, and why we need
>> to
>> > > > track
>> > > > > > the
>> > > > > > > producer task and total task count in the watermark message to
>> do
>> > > > this.
>> > > > > > >
>> > > > > > > Checkpoint watermarks section is also unclear. What problem are
>> > we
>> > > > > trying
>> > > > > > > to solve here?
>> > > > > > >
>> > > > > > > Should also move the message format and the watermark message
>> > > > interface
>> > > > > > > sections to the bottom, since they depend on details in the
>> event
>> > > > time
>> > > > > > and
>> > > > > > > checkpoint watermark sections.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Prateek
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <
>> > xinyuliu.us@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > I created SEP-6 for SAMZA-1260
>> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
>> > > > > Watermark
>> > > > > > > > Across Intermediate Streams for Batch Processing. The link to
>> > the
>> > > > SEP
>> > > > > > is
>> > > > > > > > here:
>> > > > > > > >
>> > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
>> > > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
>> > > > Batch+Processing
>> > > > > > > >
>> > > > > > > > Please review and comments are welcome!
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Xinyu
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by Yi Pan <ni...@gmail.com>.
Hi, Xinyu,

Thanks for the update. So I have two suggestions:
- It seems to me that EndOfStream can be implemented as a special type of
Watermark as well. a) we can use MAX_INT in the watermark value to indicate
the end-of-stream; b) the streamId are simply the key to the Map<String,
Long> in the source ingestion task. When the source ingestion task received
enough count of EoS, it simply emits an EoS with its own taskName to the
intermediate stream as a watermark and the watermark propagation rule will
work. The only different thing the tasks will do in EoS is shutdown the
current tasks, while non EoS watermarks does not trigger shutdown. However,
that will allow us to simplify the type of messages and data structure to
pass through. And the reasoning in reconsilation in the downstream tasks
are pretty simple: a) # of watermarks == # of upstream tasks (i.e.
producers) b) propagation rule for the watermark message is the same;
- Based on our discussion yesterday, I think that we also need a detailed
description in the design to talk about the failure recovery scenario,
especially to answer the questions: a) in failure recovery, how the
checkpoint of offsets in the input streams and the watermark checkpoint
recovered in the current task? b) What's the correlation between the input
offsets and watermarks in the checkpoint in the current task?; c) what's
the implication of re-emitted watermarks from the current task to the
downstream tasks?

And for Beam's watermark algorithm that Chris pointed out, I think that
OldestWork(stage) would be corresponding to the watermark timestamp of some
messages/state that we buffer in the current task and have not generate the
output or commit the state change yet. That would be needed if we implement
the exact-once algorithm Backett is working on since the algorithm will
require buffering message/state that are not committed yet. For now, if we
are processing each incoming message immediately and only checkpoint the
messages being processed completely, I think that we can ignore it.

Just my few points.

Thanks!

-Yi

On Tue, May 30, 2017 at 5:47 PM, xinyu liu <xi...@gmail.com> wrote:

> @Chris: thanks a lot for providing the definitions. The first equation is
> exactly what I want to say about the watermark reconciliation. I haven't
> got to the second equation yet. Will probably think it through once I get
> there.
>
> @Yi: I updated the SEP-6 based on your feedback. Some replies to your
> questions are below:
>
> >> the proposal is for all types of control messages, not just for
> end-of-stream, right? Better to define the scope and layout the comment
> requirements of control message delivery.
>
> Right, the proposal is to support the general control messages. I added
> more content in the problem description about watermarks and also listed
> supporting watermark propagation as once of the goals.
>
> >> in step-3, how does the consumer of intermediate streams know how many
> EOS messages should be received? And we should make it clear that it should
> be EOS / producer and the count of the downstream consumer is counting on
> the number of unique EOS from all producers from the upstream.
>
> The control message itself currently contains the count of upstream
> producers (tasks). Chris suggested earlier if each processor has a global
> view of the each job models, then we can remove the count field. Right now
> we are using this field to keep track of the total count. I also updated
> the description of this part.
>
> >> In comparison table, “checkpoint the control messages received” ==> is
> it
> referring to the partially accumulated upstream EOS messages?
>
> Yes, that's correct. We will check point all the upstream tasks that has
> reached end-of-stream for a streamId.
>
> >> Please make a clear definition on “Watermark” and “EndOfStream”. Why are
> they different? Are they both control messages that requires the same
> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
> control message?
> They are different: watermark contains a timestamp from the producer task,
> while EndOfStream message indicates the producer task has completely
> processed a stream. They both are control messages which require same
> delivery pattern. I updated the SEP to make it clearer they are
> sub-category of control message.
>
> >> As for the serde for intermediate stream, I assume that we will need an
> envelope serde that is avro to wrap the user message and control message
> in? So, user-defined serde now only applies to the “UserMessage”? And
> what’s the message key in the message format?
> The serde wrapper for the message is customized: the first byte indicates
> the message type, and the following byte array is the actual message. For
> user message, we will apply user provided serde. For control message, we
> will use JSON. The key is the same. We do not need customized serde since
> we can infer the serde from message.
>
> >> A big question regarding to the watermark propagation: “When Samza
> receives watermark messages, it will emit a watermark with the earliest
> event time across all the stream partitions. No emission if the earliest
> event time doesn’t change.” Does the watermark propagation requires
> synchronization/coordination between all producers at the source? Say, if
> the task taking one input source emits watermark at 1min interval and the
> task taking another input source emits watermark at 5min interval, how does
> the downstream consumer reconcile the watermarks?
>
> Watermark propagation does not require synchronization. Chris's equations
> are very accurate about how the calculations work. Please take a look.
>
> >> In the checkpoint message format, it seems that it is only design for
> watermark messages? Any streamId info that EoS is carrying over?
>
> Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it.
> Now the EOS checkpoint has the streamId.
>
> Thanks,
> Xinyu
>
> On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
> cpettitt@linkedin.com.invalid> wrote:
>
> > FWIW, there is a Beam presentation that has a very crisp set of rules
> > around watermarks. From memory it boils down to something like:
> >
> > InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
> > Upstream(stage) }
> > OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }
> >
> > OldestWork(stage) is the oldest message that has been received by the
> stage
> > but not yet processed.
> >
> > - Chris
> >
> > On Tue, May 30, 2017 at 1:39 PM, Yi Pan <ni...@gmail.com> wrote:
> >
> > > Hi, Xinyu,
> > >
> > > Thanks for the proposal. I took a quick pass and had the following
> > > questions/comments:
> > >
> > > - message shuffling ==> data shuffling???
> > >
> > > - the proposal is for all types of control messages, not just for
> > > end-of-stream, right? Better to define the scope and layout the comment
> > > requirements of control message delivery.
> > >
> > > - dropped option should go to “Rejected alternatives”
> > >
> > > - “Samza finds out the following intermediate streams that all the
> inputs
> > > have been end-of-stream” what does it mean? The task consuming the
> input
> > > stream(s) reconcile all EoS from all input stream partitions and then
> > > propagate EoS messages to all partitions in intermediate streams? This
> is
> > > not super clear to me.
> > >
> > > - in step-3, how does the consumer of intermediate streams know how
> many
> > > EOS messages should be received? And we should make it clear that it
> > should
> > > be EOS / producer and the count of the downstream consumer is counting
> on
> > > the number of unique EOS from all producers from the upstream.
> > >
> > > - In comparison table, “checkpoint the control messages received” ==>
> is
> > it
> > > referring to the partially accumulated upstream EOS messages?
> > >
> > > - Please make a clear definition on “Watermark” and “EndOfStream”. Why
> > are
> > > they different? Are they both control messages that requires the same
> > > delivery pattern (i.e. broadcast to downstream, reconcile at the
> > consumer)?
> > > If yes, should we make the “watermark” vs “EndOfStream” a sub-category
> in
> > > control message?
> > >
> > > - As for the serde for intermediate stream, I assume that we will need
> an
> > > envelope serde that is avro to wrap the user message and control
> message
> > > in? So, user-defined serde now only applies to the “UserMessage”? And
> > > what’s the message key in the message format?
> > >
> > > - A big question regarding to the watermark propagation: “When Samza
> > > receives watermark messages, it will emit a watermark with the earliest
> > > event time across all the stream partitions. No emission if the
> earliest
> > > event time doesn’t change.” Does the watermark propagation requires
> > > synchronization/coordination between all producers at the source? Say,
> if
> > > the task taking one input source emits watermark at 1min interval and
> the
> > > task taking another input source emits watermark at 5min interval, how
> > does
> > > the downstream consumer reconcile the watermarks?
> > >
> > > - In the checkpoint message format, it seems that it is only design for
> > > watermark messages? Any streamId info that EoS is carrying over?
> > >
> > >
> > > Thanks a lot!
> > >
> > >
> > > -Yi
> > >
> > > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xi...@gmail.com>
> > wrote:
> > >
> > > > Makes sense. I noticed that too and I dropped the ControlMessage type
> > in
> > > my
> > > > code. I also moved taskName, taskCount to the parent ControlMessage
> > > class.
> > > > Just updated the SEP-6. Please take a look again.
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> > > > cpettitt@linkedin.com.invalid> wrote:
> > > >
> > > > > MessageType and ControlMessage.Type look redundant. You could
> either
> > > use
> > > > > "ControlMessage" as the type in MessageType or drop
> > > ControlMessage.Type.
> > > > >
> > > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xi...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Thanks a lot for the comments. I updated the SEP with more
> details
> > > and
> > > > > > clarification. Please let me know if you have further questions.
> > > > > >
> > > > > > Thanks,
> > > > > > Xinyu
> > > > > >
> > > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > > > > pmaheshwari@linkedin.com.invalid> wrote:
> > > > > >
> > > > > > > Hi Xinyu,
> > > > > > >
> > > > > > > Thanks for the proposal. Some requests for clarifications.
> Let's
> > > > update
> > > > > > the
> > > > > > > SEP directly instead of replying here.
> > > > > > >
> > > > > > > E.g., in "For any following intermediate stream whose input
> > streams
> > > > are
> > > > > > all
> > > > > > > end-of-stream, it will be marked as pending EOS" - Should
> clarify
> > > > that
> > > > > > > (IIUC) something is injecting EOS messages in all intermediate
> > > stream
> > > > > > > partitions once it receives EOS from all input stream
> partitions
> > > it's
> > > > > > > consuming. Should also clarify what is that something.
> > > > > > > Same for "declare end of stream once all the EOS messages have
> > been
> > > > > > > received." - What does this declaration involve and who is
> doing
> > > > this?
> > > > > > >
> > > > > > > In pro for approach 2: Not clear what this means - "The
> watermark
> > > can
> > > > > > > conclude the input messages before this watermark have been
> > > > complete."
> > > > > > >
> > > > > > > For the cons of approach 2: "Complicated failure scenario of
> the
> > > > second
> > > > > > > job. It needs to checkpoint all the watermark messages
> received,
> > so
> > > > > when
> > > > > > it
> > > > > > > recovered from failure, it can still count." - How is this
> > related
> > > to
> > > > > > EOS?
> > > > > > > How is this related to the checkpoint watermark section?
> > > > > > > Also, what is the "more messages required to write.. "
> referring
> > > to?
> > > > > > >
> > > > > > > "Samza needs to reconcile based on the task counts." - Please
> > > explain
> > > > > > what
> > > > > > > reconciliation means, why it needs to happen, and why we need
> to
> > > > track
> > > > > > the
> > > > > > > producer task and total task count in the watermark message to
> do
> > > > this.
> > > > > > >
> > > > > > > Checkpoint watermarks section is also unclear. What problem are
> > we
> > > > > trying
> > > > > > > to solve here?
> > > > > > >
> > > > > > > Should also move the message format and the watermark message
> > > > interface
> > > > > > > sections to the bottom, since they depend on details in the
> event
> > > > time
> > > > > > and
> > > > > > > checkpoint watermark sections.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Prateek
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <
> > xinyuliu.us@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I created SEP-6 for SAMZA-1260
> > > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
> > > > > Watermark
> > > > > > > > Across Intermediate Streams for Batch Processing. The link to
> > the
> > > > SEP
> > > > > > is
> > > > > > > > here:
> > > > > > > >
> > > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
> > > > Batch+Processing
> > > > > > > >
> > > > > > > > Please review and comments are welcome!
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Xinyu
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by xinyu liu <xi...@gmail.com>.
@Chris: thanks a lot for providing the definitions. The first equation is
exactly what I want to say about the watermark reconciliation. I haven't
got to the second equation yet. Will probably think it through once I get
there.

@Yi: I updated the SEP-6 based on your feedback. Some replies to your
questions are below:

>> the proposal is for all types of control messages, not just for
end-of-stream, right? Better to define the scope and layout the comment
requirements of control message delivery.

Right, the proposal is to support the general control messages. I added
more content in the problem description about watermarks and also listed
supporting watermark propagation as once of the goals.

>> in step-3, how does the consumer of intermediate streams know how many
EOS messages should be received? And we should make it clear that it should
be EOS / producer and the count of the downstream consumer is counting on
the number of unique EOS from all producers from the upstream.

The control message itself currently contains the count of upstream
producers (tasks). Chris suggested earlier if each processor has a global
view of the each job models, then we can remove the count field. Right now
we are using this field to keep track of the total count. I also updated
the description of this part.

>> In comparison table, “checkpoint the control messages received” ==> is it
referring to the partially accumulated upstream EOS messages?

Yes, that's correct. We will check point all the upstream tasks that has
reached end-of-stream for a streamId.

>> Please make a clear definition on “Watermark” and “EndOfStream”. Why are
they different? Are they both control messages that requires the same
delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
control message?
They are different: watermark contains a timestamp from the producer task,
while EndOfStream message indicates the producer task has completely
processed a stream. They both are control messages which require same
delivery pattern. I updated the SEP to make it clearer they are
sub-category of control message.

>> As for the serde for intermediate stream, I assume that we will need an
envelope serde that is avro to wrap the user message and control message
in? So, user-defined serde now only applies to the “UserMessage”? And
what’s the message key in the message format?
The serde wrapper for the message is customized: the first byte indicates
the message type, and the following byte array is the actual message. For
user message, we will apply user provided serde. For control message, we
will use JSON. The key is the same. We do not need customized serde since
we can infer the serde from message.

>> A big question regarding to the watermark propagation: “When Samza
receives watermark messages, it will emit a watermark with the earliest
event time across all the stream partitions. No emission if the earliest
event time doesn’t change.” Does the watermark propagation requires
synchronization/coordination between all producers at the source? Say, if
the task taking one input source emits watermark at 1min interval and the
task taking another input source emits watermark at 5min interval, how does
the downstream consumer reconcile the watermarks?

Watermark propagation does not require synchronization. Chris's equations
are very accurate about how the calculations work. Please take a look.

>> In the checkpoint message format, it seems that it is only design for
watermark messages? Any streamId info that EoS is carrying over?

Sorry, I forgot to add the Eos checkpoint there. I updated the SEP for it.
Now the EOS checkpoint has the streamId.

Thanks,
Xinyu

On Tue, May 30, 2017 at 11:03 AM, Chris Pettitt <
cpettitt@linkedin.com.invalid> wrote:

> FWIW, there is a Beam presentation that has a very crisp set of rules
> around watermarks. From memory it boils down to something like:
>
> InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
> Upstream(stage) }
> OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }
>
> OldestWork(stage) is the oldest message that has been received by the stage
> but not yet processed.
>
> - Chris
>
> On Tue, May 30, 2017 at 1:39 PM, Yi Pan <ni...@gmail.com> wrote:
>
> > Hi, Xinyu,
> >
> > Thanks for the proposal. I took a quick pass and had the following
> > questions/comments:
> >
> > - message shuffling ==> data shuffling???
> >
> > - the proposal is for all types of control messages, not just for
> > end-of-stream, right? Better to define the scope and layout the comment
> > requirements of control message delivery.
> >
> > - dropped option should go to “Rejected alternatives”
> >
> > - “Samza finds out the following intermediate streams that all the inputs
> > have been end-of-stream” what does it mean? The task consuming the input
> > stream(s) reconcile all EoS from all input stream partitions and then
> > propagate EoS messages to all partitions in intermediate streams? This is
> > not super clear to me.
> >
> > - in step-3, how does the consumer of intermediate streams know how many
> > EOS messages should be received? And we should make it clear that it
> should
> > be EOS / producer and the count of the downstream consumer is counting on
> > the number of unique EOS from all producers from the upstream.
> >
> > - In comparison table, “checkpoint the control messages received” ==> is
> it
> > referring to the partially accumulated upstream EOS messages?
> >
> > - Please make a clear definition on “Watermark” and “EndOfStream”. Why
> are
> > they different? Are they both control messages that requires the same
> > delivery pattern (i.e. broadcast to downstream, reconcile at the
> consumer)?
> > If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
> > control message?
> >
> > - As for the serde for intermediate stream, I assume that we will need an
> > envelope serde that is avro to wrap the user message and control message
> > in? So, user-defined serde now only applies to the “UserMessage”? And
> > what’s the message key in the message format?
> >
> > - A big question regarding to the watermark propagation: “When Samza
> > receives watermark messages, it will emit a watermark with the earliest
> > event time across all the stream partitions. No emission if the earliest
> > event time doesn’t change.” Does the watermark propagation requires
> > synchronization/coordination between all producers at the source? Say, if
> > the task taking one input source emits watermark at 1min interval and the
> > task taking another input source emits watermark at 5min interval, how
> does
> > the downstream consumer reconcile the watermarks?
> >
> > - In the checkpoint message format, it seems that it is only design for
> > watermark messages? Any streamId info that EoS is carrying over?
> >
> >
> > Thanks a lot!
> >
> >
> > -Yi
> >
> > On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xi...@gmail.com>
> wrote:
> >
> > > Makes sense. I noticed that too and I dropped the ControlMessage type
> in
> > my
> > > code. I also moved taskName, taskCount to the parent ControlMessage
> > class.
> > > Just updated the SEP-6. Please take a look again.
> > >
> > > Thanks,
> > > Xinyu
> > >
> > > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> > > cpettitt@linkedin.com.invalid> wrote:
> > >
> > > > MessageType and ControlMessage.Type look redundant. You could either
> > use
> > > > "ControlMessage" as the type in MessageType or drop
> > ControlMessage.Type.
> > > >
> > > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xi...@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks a lot for the comments. I updated the SEP with more details
> > and
> > > > > clarification. Please let me know if you have further questions.
> > > > >
> > > > > Thanks,
> > > > > Xinyu
> > > > >
> > > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > > > pmaheshwari@linkedin.com.invalid> wrote:
> > > > >
> > > > > > Hi Xinyu,
> > > > > >
> > > > > > Thanks for the proposal. Some requests for clarifications. Let's
> > > update
> > > > > the
> > > > > > SEP directly instead of replying here.
> > > > > >
> > > > > > E.g., in "For any following intermediate stream whose input
> streams
> > > are
> > > > > all
> > > > > > end-of-stream, it will be marked as pending EOS" - Should clarify
> > > that
> > > > > > (IIUC) something is injecting EOS messages in all intermediate
> > stream
> > > > > > partitions once it receives EOS from all input stream partitions
> > it's
> > > > > > consuming. Should also clarify what is that something.
> > > > > > Same for "declare end of stream once all the EOS messages have
> been
> > > > > > received." - What does this declaration involve and who is doing
> > > this?
> > > > > >
> > > > > > In pro for approach 2: Not clear what this means - "The watermark
> > can
> > > > > > conclude the input messages before this watermark have been
> > > complete."
> > > > > >
> > > > > > For the cons of approach 2: "Complicated failure scenario of the
> > > second
> > > > > > job. It needs to checkpoint all the watermark messages received,
> so
> > > > when
> > > > > it
> > > > > > recovered from failure, it can still count." - How is this
> related
> > to
> > > > > EOS?
> > > > > > How is this related to the checkpoint watermark section?
> > > > > > Also, what is the "more messages required to write.. " referring
> > to?
> > > > > >
> > > > > > "Samza needs to reconcile based on the task counts." - Please
> > explain
> > > > > what
> > > > > > reconciliation means, why it needs to happen, and why we need to
> > > track
> > > > > the
> > > > > > producer task and total task count in the watermark message to do
> > > this.
> > > > > >
> > > > > > Checkpoint watermarks section is also unclear. What problem are
> we
> > > > trying
> > > > > > to solve here?
> > > > > >
> > > > > > Should also move the message format and the watermark message
> > > interface
> > > > > > sections to the bottom, since they depend on details in the event
> > > time
> > > > > and
> > > > > > checkpoint watermark sections.
> > > > > >
> > > > > > Thanks,
> > > > > > Prateek
> > > > > >
> > > > > >
> > > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <
> xinyuliu.us@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I created SEP-6 for SAMZA-1260
> > > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
> > > > Watermark
> > > > > > > Across Intermediate Streams for Batch Processing. The link to
> the
> > > SEP
> > > > > is
> > > > > > > here:
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
> > > Batch+Processing
> > > > > > >
> > > > > > > Please review and comments are welcome!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Xinyu
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by Chris Pettitt <cp...@linkedin.com.INVALID>.
FWIW, there is a Beam presentation that has a very crisp set of rules
around watermarks. From memory it boils down to something like:

InputWatermark(stage) = min { OutputWatermark(stage') for stage' in
Upstream(stage) }
OutputWatermark(stage) = min { InputWatermark(stage), OldestWork(stage) }

OldestWork(stage) is the oldest message that has been received by the stage
but not yet processed.

- Chris

On Tue, May 30, 2017 at 1:39 PM, Yi Pan <ni...@gmail.com> wrote:

> Hi, Xinyu,
>
> Thanks for the proposal. I took a quick pass and had the following
> questions/comments:
>
> - message shuffling ==> data shuffling???
>
> - the proposal is for all types of control messages, not just for
> end-of-stream, right? Better to define the scope and layout the comment
> requirements of control message delivery.
>
> - dropped option should go to “Rejected alternatives”
>
> - “Samza finds out the following intermediate streams that all the inputs
> have been end-of-stream” what does it mean? The task consuming the input
> stream(s) reconcile all EoS from all input stream partitions and then
> propagate EoS messages to all partitions in intermediate streams? This is
> not super clear to me.
>
> - in step-3, how does the consumer of intermediate streams know how many
> EOS messages should be received? And we should make it clear that it should
> be EOS / producer and the count of the downstream consumer is counting on
> the number of unique EOS from all producers from the upstream.
>
> - In comparison table, “checkpoint the control messages received” ==> is it
> referring to the partially accumulated upstream EOS messages?
>
> - Please make a clear definition on “Watermark” and “EndOfStream”. Why are
> they different? Are they both control messages that requires the same
> delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
> If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
> control message?
>
> - As for the serde for intermediate stream, I assume that we will need an
> envelope serde that is avro to wrap the user message and control message
> in? So, user-defined serde now only applies to the “UserMessage”? And
> what’s the message key in the message format?
>
> - A big question regarding to the watermark propagation: “When Samza
> receives watermark messages, it will emit a watermark with the earliest
> event time across all the stream partitions. No emission if the earliest
> event time doesn’t change.” Does the watermark propagation requires
> synchronization/coordination between all producers at the source? Say, if
> the task taking one input source emits watermark at 1min interval and the
> task taking another input source emits watermark at 5min interval, how does
> the downstream consumer reconcile the watermarks?
>
> - In the checkpoint message format, it seems that it is only design for
> watermark messages? Any streamId info that EoS is carrying over?
>
>
> Thanks a lot!
>
>
> -Yi
>
> On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xi...@gmail.com> wrote:
>
> > Makes sense. I noticed that too and I dropped the ControlMessage type in
> my
> > code. I also moved taskName, taskCount to the parent ControlMessage
> class.
> > Just updated the SEP-6. Please take a look again.
> >
> > Thanks,
> > Xinyu
> >
> > On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> > cpettitt@linkedin.com.invalid> wrote:
> >
> > > MessageType and ControlMessage.Type look redundant. You could either
> use
> > > "ControlMessage" as the type in MessageType or drop
> ControlMessage.Type.
> > >
> > > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xi...@gmail.com>
> > wrote:
> > >
> > > > Thanks a lot for the comments. I updated the SEP with more details
> and
> > > > clarification. Please let me know if you have further questions.
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > > pmaheshwari@linkedin.com.invalid> wrote:
> > > >
> > > > > Hi Xinyu,
> > > > >
> > > > > Thanks for the proposal. Some requests for clarifications. Let's
> > update
> > > > the
> > > > > SEP directly instead of replying here.
> > > > >
> > > > > E.g., in "For any following intermediate stream whose input streams
> > are
> > > > all
> > > > > end-of-stream, it will be marked as pending EOS" - Should clarify
> > that
> > > > > (IIUC) something is injecting EOS messages in all intermediate
> stream
> > > > > partitions once it receives EOS from all input stream partitions
> it's
> > > > > consuming. Should also clarify what is that something.
> > > > > Same for "declare end of stream once all the EOS messages have been
> > > > > received." - What does this declaration involve and who is doing
> > this?
> > > > >
> > > > > In pro for approach 2: Not clear what this means - "The watermark
> can
> > > > > conclude the input messages before this watermark have been
> > complete."
> > > > >
> > > > > For the cons of approach 2: "Complicated failure scenario of the
> > second
> > > > > job. It needs to checkpoint all the watermark messages received, so
> > > when
> > > > it
> > > > > recovered from failure, it can still count." - How is this related
> to
> > > > EOS?
> > > > > How is this related to the checkpoint watermark section?
> > > > > Also, what is the "more messages required to write.. " referring
> to?
> > > > >
> > > > > "Samza needs to reconcile based on the task counts." - Please
> explain
> > > > what
> > > > > reconciliation means, why it needs to happen, and why we need to
> > track
> > > > the
> > > > > producer task and total task count in the watermark message to do
> > this.
> > > > >
> > > > > Checkpoint watermarks section is also unclear. What problem are we
> > > trying
> > > > > to solve here?
> > > > >
> > > > > Should also move the message format and the watermark message
> > interface
> > > > > sections to the bottom, since they depend on details in the event
> > time
> > > > and
> > > > > checkpoint watermark sections.
> > > > >
> > > > > Thanks,
> > > > > Prateek
> > > > >
> > > > >
> > > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xinyuliu.us@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I created SEP-6 for SAMZA-1260
> > > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
> > > Watermark
> > > > > > Across Intermediate Streams for Batch Processing. The link to the
> > SEP
> > > > is
> > > > > > here:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
> > Batch+Processing
> > > > > >
> > > > > > Please review and comments are welcome!
> > > > > >
> > > > > > Thanks,
> > > > > > Xinyu
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by Yi Pan <ni...@gmail.com>.
Hi, Xinyu,

Thanks for the proposal. I took a quick pass and had the following
questions/comments:

- message shuffling ==> data shuffling???

- the proposal is for all types of control messages, not just for
end-of-stream, right? Better to define the scope and layout the comment
requirements of control message delivery.

- dropped option should go to “Rejected alternatives”

- “Samza finds out the following intermediate streams that all the inputs
have been end-of-stream” what does it mean? The task consuming the input
stream(s) reconcile all EoS from all input stream partitions and then
propagate EoS messages to all partitions in intermediate streams? This is
not super clear to me.

- in step-3, how does the consumer of intermediate streams know how many
EOS messages should be received? And we should make it clear that it should
be EOS / producer and the count of the downstream consumer is counting on
the number of unique EOS from all producers from the upstream.

- In comparison table, “checkpoint the control messages received” ==> is it
referring to the partially accumulated upstream EOS messages?

- Please make a clear definition on “Watermark” and “EndOfStream”. Why are
they different? Are they both control messages that requires the same
delivery pattern (i.e. broadcast to downstream, reconcile at the consumer)?
If yes, should we make the “watermark” vs “EndOfStream” a sub-category in
control message?

- As for the serde for intermediate stream, I assume that we will need an
envelope serde that is avro to wrap the user message and control message
in? So, user-defined serde now only applies to the “UserMessage”? And
what’s the message key in the message format?

- A big question regarding to the watermark propagation: “When Samza
receives watermark messages, it will emit a watermark with the earliest
event time across all the stream partitions. No emission if the earliest
event time doesn’t change.” Does the watermark propagation requires
synchronization/coordination between all producers at the source? Say, if
the task taking one input source emits watermark at 1min interval and the
task taking another input source emits watermark at 5min interval, how does
the downstream consumer reconcile the watermarks?

- In the checkpoint message format, it seems that it is only design for
watermark messages? Any streamId info that EoS is carrying over?


Thanks a lot!


-Yi

On Tue, May 30, 2017 at 9:46 AM, xinyu liu <xi...@gmail.com> wrote:

> Makes sense. I noticed that too and I dropped the ControlMessage type in my
> code. I also moved taskName, taskCount to the parent ControlMessage class.
> Just updated the SEP-6. Please take a look again.
>
> Thanks,
> Xinyu
>
> On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
> cpettitt@linkedin.com.invalid> wrote:
>
> > MessageType and ControlMessage.Type look redundant. You could either use
> > "ControlMessage" as the type in MessageType or drop ControlMessage.Type.
> >
> > On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xi...@gmail.com>
> wrote:
> >
> > > Thanks a lot for the comments. I updated the SEP with more details and
> > > clarification. Please let me know if you have further questions.
> > >
> > > Thanks,
> > > Xinyu
> > >
> > > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > > pmaheshwari@linkedin.com.invalid> wrote:
> > >
> > > > Hi Xinyu,
> > > >
> > > > Thanks for the proposal. Some requests for clarifications. Let's
> update
> > > the
> > > > SEP directly instead of replying here.
> > > >
> > > > E.g., in "For any following intermediate stream whose input streams
> are
> > > all
> > > > end-of-stream, it will be marked as pending EOS" - Should clarify
> that
> > > > (IIUC) something is injecting EOS messages in all intermediate stream
> > > > partitions once it receives EOS from all input stream partitions it's
> > > > consuming. Should also clarify what is that something.
> > > > Same for "declare end of stream once all the EOS messages have been
> > > > received." - What does this declaration involve and who is doing
> this?
> > > >
> > > > In pro for approach 2: Not clear what this means - "The watermark can
> > > > conclude the input messages before this watermark have been
> complete."
> > > >
> > > > For the cons of approach 2: "Complicated failure scenario of the
> second
> > > > job. It needs to checkpoint all the watermark messages received, so
> > when
> > > it
> > > > recovered from failure, it can still count." - How is this related to
> > > EOS?
> > > > How is this related to the checkpoint watermark section?
> > > > Also, what is the "more messages required to write.. " referring to?
> > > >
> > > > "Samza needs to reconcile based on the task counts." - Please explain
> > > what
> > > > reconciliation means, why it needs to happen, and why we need to
> track
> > > the
> > > > producer task and total task count in the watermark message to do
> this.
> > > >
> > > > Checkpoint watermarks section is also unclear. What problem are we
> > trying
> > > > to solve here?
> > > >
> > > > Should also move the message format and the watermark message
> interface
> > > > sections to the bottom, since they depend on details in the event
> time
> > > and
> > > > checkpoint watermark sections.
> > > >
> > > > Thanks,
> > > > Prateek
> > > >
> > > >
> > > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xi...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I created SEP-6 for SAMZA-1260
> > > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
> > Watermark
> > > > > Across Intermediate Streams for Batch Processing. The link to the
> SEP
> > > is
> > > > > here:
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > > 6+Support+Watermark+Across+Intermediate+Streams+for+
> Batch+Processing
> > > > >
> > > > > Please review and comments are welcome!
> > > > >
> > > > > Thanks,
> > > > > Xinyu
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by xinyu liu <xi...@gmail.com>.
Makes sense. I noticed that too and I dropped the ControlMessage type in my
code. I also moved taskName, taskCount to the parent ControlMessage class.
Just updated the SEP-6. Please take a look again.

Thanks,
Xinyu

On Tue, May 30, 2017 at 9:12 AM, Chris Pettitt <
cpettitt@linkedin.com.invalid> wrote:

> MessageType and ControlMessage.Type look redundant. You could either use
> "ControlMessage" as the type in MessageType or drop ControlMessage.Type.
>
> On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xi...@gmail.com> wrote:
>
> > Thanks a lot for the comments. I updated the SEP with more details and
> > clarification. Please let me know if you have further questions.
> >
> > Thanks,
> > Xinyu
> >
> > On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> > pmaheshwari@linkedin.com.invalid> wrote:
> >
> > > Hi Xinyu,
> > >
> > > Thanks for the proposal. Some requests for clarifications. Let's update
> > the
> > > SEP directly instead of replying here.
> > >
> > > E.g., in "For any following intermediate stream whose input streams are
> > all
> > > end-of-stream, it will be marked as pending EOS" - Should clarify that
> > > (IIUC) something is injecting EOS messages in all intermediate stream
> > > partitions once it receives EOS from all input stream partitions it's
> > > consuming. Should also clarify what is that something.
> > > Same for "declare end of stream once all the EOS messages have been
> > > received." - What does this declaration involve and who is doing this?
> > >
> > > In pro for approach 2: Not clear what this means - "The watermark can
> > > conclude the input messages before this watermark have been complete."
> > >
> > > For the cons of approach 2: "Complicated failure scenario of the second
> > > job. It needs to checkpoint all the watermark messages received, so
> when
> > it
> > > recovered from failure, it can still count." - How is this related to
> > EOS?
> > > How is this related to the checkpoint watermark section?
> > > Also, what is the "more messages required to write.. " referring to?
> > >
> > > "Samza needs to reconcile based on the task counts." - Please explain
> > what
> > > reconciliation means, why it needs to happen, and why we need to track
> > the
> > > producer task and total task count in the watermark message to do this.
> > >
> > > Checkpoint watermarks section is also unclear. What problem are we
> trying
> > > to solve here?
> > >
> > > Should also move the message format and the watermark message interface
> > > sections to the bottom, since they depend on details in the event time
> > and
> > > checkpoint watermark sections.
> > >
> > > Thanks,
> > > Prateek
> > >
> > >
> > > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xi...@gmail.com>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I created SEP-6 for SAMZA-1260
> > > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support
> Watermark
> > > > Across Intermediate Streams for Batch Processing. The link to the SEP
> > is
> > > > here:
> > > >
> > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
> > > >
> > > > Please review and comments are welcome!
> > > >
> > > > Thanks,
> > > > Xinyu
> > > >
> > >
> >
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by Chris Pettitt <cp...@linkedin.com.INVALID>.
MessageType and ControlMessage.Type look redundant. You could either use
"ControlMessage" as the type in MessageType or drop ControlMessage.Type.

On Fri, May 26, 2017 at 5:14 PM, xinyu liu <xi...@gmail.com> wrote:

> Thanks a lot for the comments. I updated the SEP with more details and
> clarification. Please let me know if you have further questions.
>
> Thanks,
> Xinyu
>
> On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
> pmaheshwari@linkedin.com.invalid> wrote:
>
> > Hi Xinyu,
> >
> > Thanks for the proposal. Some requests for clarifications. Let's update
> the
> > SEP directly instead of replying here.
> >
> > E.g., in "For any following intermediate stream whose input streams are
> all
> > end-of-stream, it will be marked as pending EOS" - Should clarify that
> > (IIUC) something is injecting EOS messages in all intermediate stream
> > partitions once it receives EOS from all input stream partitions it's
> > consuming. Should also clarify what is that something.
> > Same for "declare end of stream once all the EOS messages have been
> > received." - What does this declaration involve and who is doing this?
> >
> > In pro for approach 2: Not clear what this means - "The watermark can
> > conclude the input messages before this watermark have been complete."
> >
> > For the cons of approach 2: "Complicated failure scenario of the second
> > job. It needs to checkpoint all the watermark messages received, so when
> it
> > recovered from failure, it can still count." - How is this related to
> EOS?
> > How is this related to the checkpoint watermark section?
> > Also, what is the "more messages required to write.. " referring to?
> >
> > "Samza needs to reconcile based on the task counts." - Please explain
> what
> > reconciliation means, why it needs to happen, and why we need to track
> the
> > producer task and total task count in the watermark message to do this.
> >
> > Checkpoint watermarks section is also unclear. What problem are we trying
> > to solve here?
> >
> > Should also move the message format and the watermark message interface
> > sections to the bottom, since they depend on details in the event time
> and
> > checkpoint watermark sections.
> >
> > Thanks,
> > Prateek
> >
> >
> > On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xi...@gmail.com>
> wrote:
> >
> > > Hi all,
> > >
> > > I created SEP-6 for SAMZA-1260
> > > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support Watermark
> > > Across Intermediate Streams for Batch Processing. The link to the SEP
> is
> > > here:
> > >
> > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
> > >
> > > Please review and comments are welcome!
> > >
> > > Thanks,
> > > Xinyu
> > >
> >
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by xinyu liu <xi...@gmail.com>.
Thanks a lot for the comments. I updated the SEP with more details and
clarification. Please let me know if you have further questions.

Thanks,
Xinyu

On Thu, May 25, 2017 at 11:19 AM, Prateek Maheshwari <
pmaheshwari@linkedin.com.invalid> wrote:

> Hi Xinyu,
>
> Thanks for the proposal. Some requests for clarifications. Let's update the
> SEP directly instead of replying here.
>
> E.g., in "For any following intermediate stream whose input streams are all
> end-of-stream, it will be marked as pending EOS" - Should clarify that
> (IIUC) something is injecting EOS messages in all intermediate stream
> partitions once it receives EOS from all input stream partitions it's
> consuming. Should also clarify what is that something.
> Same for "declare end of stream once all the EOS messages have been
> received." - What does this declaration involve and who is doing this?
>
> In pro for approach 2: Not clear what this means - "The watermark can
> conclude the input messages before this watermark have been complete."
>
> For the cons of approach 2: "Complicated failure scenario of the second
> job. It needs to checkpoint all the watermark messages received, so when it
> recovered from failure, it can still count." - How is this related to EOS?
> How is this related to the checkpoint watermark section?
> Also, what is the "more messages required to write.. " referring to?
>
> "Samza needs to reconcile based on the task counts." - Please explain what
> reconciliation means, why it needs to happen, and why we need to track the
> producer task and total task count in the watermark message to do this.
>
> Checkpoint watermarks section is also unclear. What problem are we trying
> to solve here?
>
> Should also move the message format and the watermark message interface
> sections to the bottom, since they depend on details in the event time and
> checkpoint watermark sections.
>
> Thanks,
> Prateek
>
>
> On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xi...@gmail.com> wrote:
>
> > Hi all,
> >
> > I created SEP-6 for SAMZA-1260
> > <https://issues.apache.org/jira/browse/SAMZA-1260>: Support Watermark
> > Across Intermediate Streams for Batch Processing. The link to the SEP is
> > here:
> >
> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
> >
> > Please review and comments are welcome!
> >
> > Thanks,
> > Xinyu
> >
>

Re: [DISCUSS] SEP-6: Support Watermark Across Intermediate Streams for Batch Processing

Posted by Prateek Maheshwari <pm...@linkedin.com.INVALID>.
Hi Xinyu,

Thanks for the proposal. Some requests for clarifications. Let's update the
SEP directly instead of replying here.

E.g., in "For any following intermediate stream whose input streams are all
end-of-stream, it will be marked as pending EOS" - Should clarify that
(IIUC) something is injecting EOS messages in all intermediate stream
partitions once it receives EOS from all input stream partitions it's
consuming. Should also clarify what is that something.
Same for "declare end of stream once all the EOS messages have been
received." - What does this declaration involve and who is doing this?

In pro for approach 2: Not clear what this means - "The watermark can
conclude the input messages before this watermark have been complete."

For the cons of approach 2: "Complicated failure scenario of the second
job. It needs to checkpoint all the watermark messages received, so when it
recovered from failure, it can still count." - How is this related to EOS?
How is this related to the checkpoint watermark section?
Also, what is the "more messages required to write.. " referring to?

"Samza needs to reconcile based on the task counts." - Please explain what
reconciliation means, why it needs to happen, and why we need to track the
producer task and total task count in the watermark message to do this.

Checkpoint watermarks section is also unclear. What problem are we trying
to solve here?

Should also move the message format and the watermark message interface
sections to the bottom, since they depend on details in the event time and
checkpoint watermark sections.

Thanks,
Prateek


On Wed, May 24, 2017 at 11:30 AM, xinyu liu <xi...@gmail.com> wrote:

> Hi all,
>
> I created SEP-6 for SAMZA-1260
> <https://issues.apache.org/jira/browse/SAMZA-1260>: Support Watermark
> Across Intermediate Streams for Batch Processing. The link to the SEP is
> here:
>
> https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> 6+Support+Watermark+Across+Intermediate+Streams+for+Batch+Processing
>
> Please review and comments are welcome!
>
> Thanks,
> Xinyu
>