You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com.INVALID> on 2018/10/18 06:42:16 UTC

回复:Sharing state between subtasks

Just noticed this discussion from @Till Rohrmann's weekly community update and I want to share some thoughts from our experiences.

We also encountered the source consuption skew issue before, and we are focused on improving this by two possible ways.

1. Control the read strategy by the downstream side. In detail, every input channel in downstream task corresponds to the consumption of one upstream source task, and we will tag each input channel with watermark to find the lowest channel to read in high priority. In essence, we actually rely on the mechanism of backpressure. If the channel with highest timestamp is not read by downstream task for a while, it will block the corresponding source task to read when the buffers are exhausted. It is no need to change the source interface in this way, but there are two major concerns: first it will affect the barier alignment resulting in checkpoint delayed or expired. Second it can not confirm source consumption alignment very precisely, and it is just a best effort way. So we gave up this way finally.

2. Add the new component of SourceCoordinator to coordinate the source consumption distributedly. For example we can start this componnet in the JobManager like the current role of CheckpointCoordinator. Then every source task would commnicate with JobManager via current RPC mechanism, maybe we can rely on the heartbeat message to attach the consumption progress as the payloads. The JobManagerwill accumulator or state all the reported progress and then give responses for different source tasks. We can define a protocol for indicating the fast soruce task to sleep for specific time for example. To do so, the coordinator has the global informations to give the proper decision for individuals, so it seems more precise. And it will not affect the barrier alignment, because the sleeping fast source can release the lock to emit barrier as normal. The only concern is the changes for source interface and may affect all related source implementations.

Currently we prefer to the second way to implement and will refer to other good points above. :)

Best,
Zhijiang
------------------------------------------------------------------
发件人:Jamie Grier <jg...@lyft.com.INVALID>
发送时间:2018年10月17日(星期三) 03:28
收件人:dev <de...@flink.apache.org>
主 题:Re: Sharing state between subtasks

Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <tr...@apache.org> wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy <fe...@gmail.com>
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fh...@gmail.com>
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from which source task it accepts
> > >> events and from which doesn't.
> > >> If it blocks an input, all downstream tasks that are connected to the
> > >> operator are affected. This can easily lead to deadlocks.
> > >> Therefore, all operators need to be able to handle events when they
> > arrive.
> > >> If they cannot process them yet because they are too far ahead in
> time,
> > >> they are put in state.
> > >>
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> >
> >
>


Re: Sharing state between subtasks

Posted by Thomas Weise <th...@apache.org>.
The state sharing support will be part of the upcoming 1.8 release.

We have also completed most of the synchronization work for the Kinesis
consumer and will contribute those changes to Flink soon.

Most of the code will be reusable for Kafka consumer.

We will need the same support in the Kafka consumer but have not started
work to integrate that yet.

Thomas

On Thu, Mar 7, 2019 at 6:53 AM Gerard Garcia <ge...@talaia.io> wrote:

> Any advance related to synchronizing ingestion by event/ingestion-time
> between kafka partitions?
>
> On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier <jg...@lyft.com.invalid>
> wrote:
>
> > Hey all,
> >
> > I think all we need for this on the state sharing side is pretty
> simple.  I
> > opened a JIRA to track this work and submitted a PR for the state sharing
> > bit.
> >
> > https://issues.apache.org/jira/browse/FLINK-10886
> > https://github.com/apache/flink/pull/7099
> >
> > Please provide feedback :)
> >
> > -Jamie
> >
> >
> > On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <tr...@apache.org>
> wrote:
> >
> > > Hi Thomas,
> > >
> > > using Akka directly would further manifest our dependency on Scala in
> > > flink-runtime. This is something we are currently trying to get rid of.
> > For
> > > that purpose we have added the RpcService abstraction which
> encapsulates
> > > all Akka specific logic. We hope that we can soon get rid of the Scala
> > > dependency in flink-runtime by using a special class loader only for
> > > loading the AkkaRpcService implementation.
> > >
> > > I think the easiest way to sync the task information is actually going
> > > through the JobMaster because the subtasks don't know on which other
> TMs
> > > the other subtasks run. Otherwise, we would need to have some TM
> > detection
> > > mechanism between TMs. If you choose this way, then you should be able
> to
> > > use the RpcService by extending the JobMasterGateway by additional
> RPCs.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <th...@apache.org> wrote:
> > >
> > > > Hi,
> > > >
> > > > We are planning to work on the Kinesis consumer in the following
> order:
> > > >
> > > > 1. Add per shard watermarking:
> > > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code
> > we
> > > > already use internally; I will open a PR to add it to the Flink
> Kinesis
> > > > consumer
> > > > 2. Exchange of per subtask watermarks between all subtasks of one or
> > > > multiple sources
> > > > 3. Implement queue approach described in Jamie's document in to
> utilize
> > > 1.)
> > > > and 2.) to align the shard consumers WRT event time
> > > >
> > > > There was some discussion regarding the mechanism to share the
> > watermarks
> > > > between subtasks. If there is something that can be re-used it would
> be
> > > > great. Otherwise I'm going to further investigate the Akka or JGroups
> > > > route. Regarding Akka, since it is used within Flink already, is
> there
> > an
> > > > abstraction that you would recommend to consider to avoid direct
> > > > dependency?
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > >
> > > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > > > <wa...@aliyun.com.invalid> wrote:
> > > >
> > > > > Not yet. We only have some initial thoughts and have not worked on
> it
> > > > yet.
> > > > > We will update the progress in this discussion if have.
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > > ------------------------------------------------------------------
> > > > > 发件人:Aljoscha Krettek <al...@apache.org>
> > > > > 发送时间:2018年10月18日(星期四) 17:53
> > > > > 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <
> > > > > wangzhijiang999@aliyun.com>
> > > > > 抄 送:Till Rohrmann <tr...@apache.org>
> > > > > 主 题:Re: Sharing state between subtasks
> > > > >
> > > > > Hi Zhijiang,
> > > > >
> > > > > do you already have working code or a design doc for the second
> > > approach?
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > > > wangzhijiang999@aliyun.com.INVALID> wrote:
> > > > > >
> > > > > > Just noticed this discussion from @Till Rohrmann's weekly
> community
> > > > > update and I want to share some thoughts from our experiences.
> > > > > >
> > > > > > We also encountered the source consuption skew issue before, and
> we
> > > are
> > > > > focused on improving this by two possible ways.
> > > > > >
> > > > > > 1. Control the read strategy by the downstream side. In detail,
> > every
> > > > > input channel in downstream task corresponds to the consumption of
> > one
> > > > > upstream source task, and we will tag each input channel with
> > watermark
> > > > to
> > > > > find the lowest channel to read in high priority. In essence, we
> > > actually
> > > > > rely on the mechanism of backpressure. If the channel with highest
> > > > > timestamp is not read by downstream task for a while, it will block
> > the
> > > > > corresponding source task to read when the buffers are exhausted.
> It
> > is
> > > > no
> > > > > need to change the source interface in this way, but there are two
> > > major
> > > > > concerns: first it will affect the barier alignment resulting in
> > > > checkpoint
> > > > > delayed or expired. Second it can not confirm source consumption
> > > > alignment
> > > > > very precisely, and it is just a best effort way. So we gave up
> this
> > > way
> > > > > finally.
> > > > > >
> > > > > > 2. Add the new component of SourceCoordinator to coordinate the
> > > source
> > > > > consumption distributedly. For example we can start this componnet
> in
> > > the
> > > > > JobManager like the current role of CheckpointCoordinator. Then
> every
> > > > > source task would commnicate with JobManager via current RPC
> > mechanism,
> > > > > maybe we can rely on the heartbeat message to attach the
> consumption
> > > > > progress as the payloads. The JobManagerwill accumulator or state
> all
> > > the
> > > > > reported progress and then give responses for different source
> tasks.
> > > We
> > > > > can define a protocol for indicating the fast soruce task to sleep
> > for
> > > > > specific time for example. To do so, the coordinator has the global
> > > > > informations to give the proper decision for individuals, so it
> seems
> > > > more
> > > > > precise. And it will not affect the barrier alignment, because the
> > > > sleeping
> > > > > fast source can release the lock to emit barrier as normal. The
> only
> > > > > concern is the changes for source interface and may affect all
> > related
> > > > > source implementations.
> > > > > >
> > > > > > Currently we prefer to the second way to implement and will refer
> > to
> > > > > other good points above. :)
> > > > > >
> > > > > > Best,
> > > > > > Zhijiang
> > > > > >
> ------------------------------------------------------------------
> > > > > > 发件人:Jamie Grier <jg...@lyft.com.INVALID>
> > > > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > > > 收件人:dev <de...@flink.apache.org>
> > > > > > 主 题:Re: Sharing state between subtasks
> > > > > >
> > > > > > Here's a doc I started describing some changes we would like to
> > make
> > > > > > starting with the Kinesis Source.. It describes a refactoring of
> > that
> > > > > code
> > > > > > specifically and also hopefully a pattern and some reusable code
> we
> > > can
> > > > > use
> > > > > > in the other sources as well.  The end goal would be best-effort
> > > > > event-time
> > > > > > synchronization across all Flink sources but we are going to
> start
> > > with
> > > > > the
> > > > > > Kinesis Source first.
> > > > > >
> > > > > > Please take a look and please provide thoughts and opinions about
> > the
> > > > > best
> > > > > > state sharing mechanism to use -- that section is left blank and
> > > we're
> > > > > > especially looking for input there.
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > > > >
> > > > > > -Jamie
> > > > > >
> > > > > >
> > > > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <
> > trohrmann@apache.org
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> But on the Kafka source level it should be perfectly fine to do
> > what
> > > > > Elias
> > > > > >> proposed. This is of course is not the perfect solution but
> could
> > > > bring
> > > > > us
> > > > > >> forward quite a bit. The changes required for this should also
> be
> > > > > minimal.
> > > > > >> This would become obsolete once we have something like shared
> > state.
> > > > But
> > > > > >> until then, I think it would worth a try.
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Till
> > > > > >>
> > > > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> > > aljoscha@apache.org
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> The reason this selective reading doesn't work well in Flink in
> > the
> > > > > >> moment
> > > > > >>> is because of checkpointing. For checkpointing, checkpoint
> > barriers
> > > > > >> travel
> > > > > >>> within the streams. If we selectively read from inputs based on
> > > > > >> timestamps
> > > > > >>> this is akin to blocking an input if that input is very far
> ahead
> > > in
> > > > > >> event
> > > > > >>> time, which can happen when you have a very fast source and a
> > slow
> > > > > source
> > > > > >>> (in event time), maybe because you're in a catchup phase. In
> > those
> > > > > cases
> > > > > >>> it's better to simply not read the data at the sources, as
> Thomas
> > > > said.
> > > > > >>> This is also because with Kafka Streams, each operator is
> > basically
> > > > its
> > > > > >> own
> > > > > >>> job: it's reading from Kafka and writing to Kafka and there is
> > not
> > > a
> > > > > >>> complex graph of different operations with network shuffles in
> > > > between,
> > > > > >> as
> > > > > >>> you have with Flink.
> > > > > >>>
> > > > > >>> This different nature of Flink is also why I think that readers
> > > need
> > > > > >>> awareness of other readers to do the event-time alignment, and
> > this
> > > > is
> > > > > >>> where shared state comes in.
> > > > > >>>
> > > > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> > > fearsome.lucidity@gmail.com>
> > > > > >>> wrote:
> > > > > >>>>
> > > > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <
> > fhueske@gmail.com>
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>>> I think the new source interface would be designed to be able
> > to
> > > > > >>> leverage
> > > > > >>>>> shared state to achieve time alignment.
> > > > > >>>>> I don't think this would be possible without some kind of
> > shared
> > > > > >> state.
> > > > > >>>>>
> > > > > >>>>> The problem of tasks that are far ahead in time cannot be
> > solved
> > > > with
> > > > > >>>>> back-pressure.
> > > > > >>>>> That's because a task cannot choose from which source task it
> > > > accepts
> > > > > >>>>> events and from which doesn't.
> > > > > >>>>> If it blocks an input, all downstream tasks that are
> connected
> > to
> > > > the
> > > > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > > > >>>>> Therefore, all operators need to be able to handle events
> when
> > > they
> > > > > >>> arrive.
> > > > > >>>>> If they cannot process them yet because they are too far
> ahead
> > in
> > > > > >> time,
> > > > > >>>>> they are put in state.
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>> The idea I was suggesting is not for operators to block an
> > input.
> > > > > >>> Rather,
> > > > > >>>> it is that they selectively choose from which input to process
> > the
> > > > > next
> > > > > >>>> message from based on their timestamp, so long as there are
> > > buffered
> > > > > >>>> messages waiting to be processed.  That is a best-effort
> > alignment
> > > > > >>>> strategy.  Seems to work relatively well in practice, at least
> > > > within
> > > > > >>> Kafka
> > > > > >>>> Streams.
> > > > > >>>>
> > > > > >>>> E.g. at the moment StreamTwoInputProcessor uses a
> UnionInputGate
> > > for
> > > > > >> both
> > > > > >>>> its inputs.  Instead, it could keep them separate and
> > selectively
> > > > > >> consume
> > > > > >>>> from the one that had a buffer available, and if both have
> > buffers
> > > > > >>>> available, from the buffer with the messages with a lower
> > > timestamp.
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Sharing state between subtasks

Posted by Gerard Garcia <ge...@talaia.io>.
Any advance related to synchronizing ingestion by event/ingestion-time
between kafka partitions?

On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier <jg...@lyft.com.invalid> wrote:

> Hey all,
>
> I think all we need for this on the state sharing side is pretty simple.  I
> opened a JIRA to track this work and submitted a PR for the state sharing
> bit.
>
> https://issues.apache.org/jira/browse/FLINK-10886
> https://github.com/apache/flink/pull/7099
>
> Please provide feedback :)
>
> -Jamie
>
>
> On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <tr...@apache.org> wrote:
>
> > Hi Thomas,
> >
> > using Akka directly would further manifest our dependency on Scala in
> > flink-runtime. This is something we are currently trying to get rid of.
> For
> > that purpose we have added the RpcService abstraction which encapsulates
> > all Akka specific logic. We hope that we can soon get rid of the Scala
> > dependency in flink-runtime by using a special class loader only for
> > loading the AkkaRpcService implementation.
> >
> > I think the easiest way to sync the task information is actually going
> > through the JobMaster because the subtasks don't know on which other TMs
> > the other subtasks run. Otherwise, we would need to have some TM
> detection
> > mechanism between TMs. If you choose this way, then you should be able to
> > use the RpcService by extending the JobMasterGateway by additional RPCs.
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <th...@apache.org> wrote:
> >
> > > Hi,
> > >
> > > We are planning to work on the Kinesis consumer in the following order:
> > >
> > > 1. Add per shard watermarking:
> > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code
> we
> > > already use internally; I will open a PR to add it to the Flink Kinesis
> > > consumer
> > > 2. Exchange of per subtask watermarks between all subtasks of one or
> > > multiple sources
> > > 3. Implement queue approach described in Jamie's document in to utilize
> > 1.)
> > > and 2.) to align the shard consumers WRT event time
> > >
> > > There was some discussion regarding the mechanism to share the
> watermarks
> > > between subtasks. If there is something that can be re-used it would be
> > > great. Otherwise I'm going to further investigate the Akka or JGroups
> > > route. Regarding Akka, since it is used within Flink already, is there
> an
> > > abstraction that you would recommend to consider to avoid direct
> > > dependency?
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > >
> > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > > <wa...@aliyun.com.invalid> wrote:
> > >
> > > > Not yet. We only have some initial thoughts and have not worked on it
> > > yet.
> > > > We will update the progress in this discussion if have.
> > > >
> > > > Best,
> > > > Zhijiang
> > > > ------------------------------------------------------------------
> > > > 发件人:Aljoscha Krettek <al...@apache.org>
> > > > 发送时间:2018年10月18日(星期四) 17:53
> > > > 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <
> > > > wangzhijiang999@aliyun.com>
> > > > 抄 送:Till Rohrmann <tr...@apache.org>
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Hi Zhijiang,
> > > >
> > > > do you already have working code or a design doc for the second
> > approach?
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > > wangzhijiang999@aliyun.com.INVALID> wrote:
> > > > >
> > > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > > update and I want to share some thoughts from our experiences.
> > > > >
> > > > > We also encountered the source consuption skew issue before, and we
> > are
> > > > focused on improving this by two possible ways.
> > > > >
> > > > > 1. Control the read strategy by the downstream side. In detail,
> every
> > > > input channel in downstream task corresponds to the consumption of
> one
> > > > upstream source task, and we will tag each input channel with
> watermark
> > > to
> > > > find the lowest channel to read in high priority. In essence, we
> > actually
> > > > rely on the mechanism of backpressure. If the channel with highest
> > > > timestamp is not read by downstream task for a while, it will block
> the
> > > > corresponding source task to read when the buffers are exhausted. It
> is
> > > no
> > > > need to change the source interface in this way, but there are two
> > major
> > > > concerns: first it will affect the barier alignment resulting in
> > > checkpoint
> > > > delayed or expired. Second it can not confirm source consumption
> > > alignment
> > > > very precisely, and it is just a best effort way. So we gave up this
> > way
> > > > finally.
> > > > >
> > > > > 2. Add the new component of SourceCoordinator to coordinate the
> > source
> > > > consumption distributedly. For example we can start this componnet in
> > the
> > > > JobManager like the current role of CheckpointCoordinator. Then every
> > > > source task would commnicate with JobManager via current RPC
> mechanism,
> > > > maybe we can rely on the heartbeat message to attach the consumption
> > > > progress as the payloads. The JobManagerwill accumulator or state all
> > the
> > > > reported progress and then give responses for different source tasks.
> > We
> > > > can define a protocol for indicating the fast soruce task to sleep
> for
> > > > specific time for example. To do so, the coordinator has the global
> > > > informations to give the proper decision for individuals, so it seems
> > > more
> > > > precise. And it will not affect the barrier alignment, because the
> > > sleeping
> > > > fast source can release the lock to emit barrier as normal. The only
> > > > concern is the changes for source interface and may affect all
> related
> > > > source implementations.
> > > > >
> > > > > Currently we prefer to the second way to implement and will refer
> to
> > > > other good points above. :)
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > > ------------------------------------------------------------------
> > > > > 发件人:Jamie Grier <jg...@lyft.com.INVALID>
> > > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > > 收件人:dev <de...@flink.apache.org>
> > > > > 主 题:Re: Sharing state between subtasks
> > > > >
> > > > > Here's a doc I started describing some changes we would like to
> make
> > > > > starting with the Kinesis Source.. It describes a refactoring of
> that
> > > > code
> > > > > specifically and also hopefully a pattern and some reusable code we
> > can
> > > > use
> > > > > in the other sources as well.  The end goal would be best-effort
> > > > event-time
> > > > > synchronization across all Flink sources but we are going to start
> > with
> > > > the
> > > > > Kinesis Source first.
> > > > >
> > > > > Please take a look and please provide thoughts and opinions about
> the
> > > > best
> > > > > state sharing mechanism to use -- that section is left blank and
> > we're
> > > > > especially looking for input there.
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > > >
> > > > > -Jamie
> > > > >
> > > > >
> > > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <
> trohrmann@apache.org
> > >
> > > > wrote:
> > > > >
> > > > >> But on the Kafka source level it should be perfectly fine to do
> what
> > > > Elias
> > > > >> proposed. This is of course is not the perfect solution but could
> > > bring
> > > > us
> > > > >> forward quite a bit. The changes required for this should also be
> > > > minimal.
> > > > >> This would become obsolete once we have something like shared
> state.
> > > But
> > > > >> until then, I think it would worth a try.
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > >> wrote:
> > > > >>
> > > > >>> The reason this selective reading doesn't work well in Flink in
> the
> > > > >> moment
> > > > >>> is because of checkpointing. For checkpointing, checkpoint
> barriers
> > > > >> travel
> > > > >>> within the streams. If we selectively read from inputs based on
> > > > >> timestamps
> > > > >>> this is akin to blocking an input if that input is very far ahead
> > in
> > > > >> event
> > > > >>> time, which can happen when you have a very fast source and a
> slow
> > > > source
> > > > >>> (in event time), maybe because you're in a catchup phase. In
> those
> > > > cases
> > > > >>> it's better to simply not read the data at the sources, as Thomas
> > > said.
> > > > >>> This is also because with Kafka Streams, each operator is
> basically
> > > its
> > > > >> own
> > > > >>> job: it's reading from Kafka and writing to Kafka and there is
> not
> > a
> > > > >>> complex graph of different operations with network shuffles in
> > > between,
> > > > >> as
> > > > >>> you have with Flink.
> > > > >>>
> > > > >>> This different nature of Flink is also why I think that readers
> > need
> > > > >>> awareness of other readers to do the event-time alignment, and
> this
> > > is
> > > > >>> where shared state comes in.
> > > > >>>
> > > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> > fearsome.lucidity@gmail.com>
> > > > >>> wrote:
> > > > >>>>
> > > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <
> fhueske@gmail.com>
> > > > >> wrote:
> > > > >>>>
> > > > >>>>> I think the new source interface would be designed to be able
> to
> > > > >>> leverage
> > > > >>>>> shared state to achieve time alignment.
> > > > >>>>> I don't think this would be possible without some kind of
> shared
> > > > >> state.
> > > > >>>>>
> > > > >>>>> The problem of tasks that are far ahead in time cannot be
> solved
> > > with
> > > > >>>>> back-pressure.
> > > > >>>>> That's because a task cannot choose from which source task it
> > > accepts
> > > > >>>>> events and from which doesn't.
> > > > >>>>> If it blocks an input, all downstream tasks that are connected
> to
> > > the
> > > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > > >>>>> Therefore, all operators need to be able to handle events when
> > they
> > > > >>> arrive.
> > > > >>>>> If they cannot process them yet because they are too far ahead
> in
> > > > >> time,
> > > > >>>>> they are put in state.
> > > > >>>>>
> > > > >>>>
> > > > >>>> The idea I was suggesting is not for operators to block an
> input.
> > > > >>> Rather,
> > > > >>>> it is that they selectively choose from which input to process
> the
> > > > next
> > > > >>>> message from based on their timestamp, so long as there are
> > buffered
> > > > >>>> messages waiting to be processed.  That is a best-effort
> alignment
> > > > >>>> strategy.  Seems to work relatively well in practice, at least
> > > within
> > > > >>> Kafka
> > > > >>>> Streams.
> > > > >>>>
> > > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate
> > for
> > > > >> both
> > > > >>>> its inputs.  Instead, it could keep them separate and
> selectively
> > > > >> consume
> > > > >>>> from the one that had a buffer available, and if both have
> buffers
> > > > >>>> available, from the buffer with the messages with a lower
> > timestamp.
> > > > >>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > > >
> > >
> >
>

Re: Sharing state between subtasks

Posted by Jamie Grier <jg...@lyft.com.INVALID>.
Hey all,

I think all we need for this on the state sharing side is pretty simple.  I
opened a JIRA to track this work and submitted a PR for the state sharing
bit.

https://issues.apache.org/jira/browse/FLINK-10886
https://github.com/apache/flink/pull/7099

Please provide feedback :)

-Jamie


On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Thomas,
>
> using Akka directly would further manifest our dependency on Scala in
> flink-runtime. This is something we are currently trying to get rid of. For
> that purpose we have added the RpcService abstraction which encapsulates
> all Akka specific logic. We hope that we can soon get rid of the Scala
> dependency in flink-runtime by using a special class loader only for
> loading the AkkaRpcService implementation.
>
> I think the easiest way to sync the task information is actually going
> through the JobMaster because the subtasks don't know on which other TMs
> the other subtasks run. Otherwise, we would need to have some TM detection
> mechanism between TMs. If you choose this way, then you should be able to
> use the RpcService by extending the JobMasterGateway by additional RPCs.
>
> Cheers,
> Till
>
> On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <th...@apache.org> wrote:
>
> > Hi,
> >
> > We are planning to work on the Kinesis consumer in the following order:
> >
> > 1. Add per shard watermarking:
> > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> > already use internally; I will open a PR to add it to the Flink Kinesis
> > consumer
> > 2. Exchange of per subtask watermarks between all subtasks of one or
> > multiple sources
> > 3. Implement queue approach described in Jamie's document in to utilize
> 1.)
> > and 2.) to align the shard consumers WRT event time
> >
> > There was some discussion regarding the mechanism to share the watermarks
> > between subtasks. If there is something that can be re-used it would be
> > great. Otherwise I'm going to further investigate the Akka or JGroups
> > route. Regarding Akka, since it is used within Flink already, is there an
> > abstraction that you would recommend to consider to avoid direct
> > dependency?
> >
> > Thanks,
> > Thomas
> >
> >
> >
> > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > <wa...@aliyun.com.invalid> wrote:
> >
> > > Not yet. We only have some initial thoughts and have not worked on it
> > yet.
> > > We will update the progress in this discussion if have.
> > >
> > > Best,
> > > Zhijiang
> > > ------------------------------------------------------------------
> > > 发件人:Aljoscha Krettek <al...@apache.org>
> > > 发送时间:2018年10月18日(星期四) 17:53
> > > 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <
> > > wangzhijiang999@aliyun.com>
> > > 抄 送:Till Rohrmann <tr...@apache.org>
> > > 主 题:Re: Sharing state between subtasks
> > >
> > > Hi Zhijiang,
> > >
> > > do you already have working code or a design doc for the second
> approach?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > wangzhijiang999@aliyun.com.INVALID> wrote:
> > > >
> > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > update and I want to share some thoughts from our experiences.
> > > >
> > > > We also encountered the source consuption skew issue before, and we
> are
> > > focused on improving this by two possible ways.
> > > >
> > > > 1. Control the read strategy by the downstream side. In detail, every
> > > input channel in downstream task corresponds to the consumption of one
> > > upstream source task, and we will tag each input channel with watermark
> > to
> > > find the lowest channel to read in high priority. In essence, we
> actually
> > > rely on the mechanism of backpressure. If the channel with highest
> > > timestamp is not read by downstream task for a while, it will block the
> > > corresponding source task to read when the buffers are exhausted. It is
> > no
> > > need to change the source interface in this way, but there are two
> major
> > > concerns: first it will affect the barier alignment resulting in
> > checkpoint
> > > delayed or expired. Second it can not confirm source consumption
> > alignment
> > > very precisely, and it is just a best effort way. So we gave up this
> way
> > > finally.
> > > >
> > > > 2. Add the new component of SourceCoordinator to coordinate the
> source
> > > consumption distributedly. For example we can start this componnet in
> the
> > > JobManager like the current role of CheckpointCoordinator. Then every
> > > source task would commnicate with JobManager via current RPC mechanism,
> > > maybe we can rely on the heartbeat message to attach the consumption
> > > progress as the payloads. The JobManagerwill accumulator or state all
> the
> > > reported progress and then give responses for different source tasks.
> We
> > > can define a protocol for indicating the fast soruce task to sleep for
> > > specific time for example. To do so, the coordinator has the global
> > > informations to give the proper decision for individuals, so it seems
> > more
> > > precise. And it will not affect the barrier alignment, because the
> > sleeping
> > > fast source can release the lock to emit barrier as normal. The only
> > > concern is the changes for source interface and may affect all related
> > > source implementations.
> > > >
> > > > Currently we prefer to the second way to implement and will refer to
> > > other good points above. :)
> > > >
> > > > Best,
> > > > Zhijiang
> > > > ------------------------------------------------------------------
> > > > 发件人:Jamie Grier <jg...@lyft.com.INVALID>
> > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > 收件人:dev <de...@flink.apache.org>
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Here's a doc I started describing some changes we would like to make
> > > > starting with the Kinesis Source.. It describes a refactoring of that
> > > code
> > > > specifically and also hopefully a pattern and some reusable code we
> can
> > > use
> > > > in the other sources as well.  The end goal would be best-effort
> > > event-time
> > > > synchronization across all Flink sources but we are going to start
> with
> > > the
> > > > Kinesis Source first.
> > > >
> > > > Please take a look and please provide thoughts and opinions about the
> > > best
> > > > state sharing mechanism to use -- that section is left blank and
> we're
> > > > especially looking for input there.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > >
> > > > -Jamie
> > > >
> > > >
> > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <trohrmann@apache.org
> >
> > > wrote:
> > > >
> > > >> But on the Kafka source level it should be perfectly fine to do what
> > > Elias
> > > >> proposed. This is of course is not the perfect solution but could
> > bring
> > > us
> > > >> forward quite a bit. The changes required for this should also be
> > > minimal.
> > > >> This would become obsolete once we have something like shared state.
> > But
> > > >> until then, I think it would worth a try.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > >> wrote:
> > > >>
> > > >>> The reason this selective reading doesn't work well in Flink in the
> > > >> moment
> > > >>> is because of checkpointing. For checkpointing, checkpoint barriers
> > > >> travel
> > > >>> within the streams. If we selectively read from inputs based on
> > > >> timestamps
> > > >>> this is akin to blocking an input if that input is very far ahead
> in
> > > >> event
> > > >>> time, which can happen when you have a very fast source and a slow
> > > source
> > > >>> (in event time), maybe because you're in a catchup phase. In those
> > > cases
> > > >>> it's better to simply not read the data at the sources, as Thomas
> > said.
> > > >>> This is also because with Kafka Streams, each operator is basically
> > its
> > > >> own
> > > >>> job: it's reading from Kafka and writing to Kafka and there is not
> a
> > > >>> complex graph of different operations with network shuffles in
> > between,
> > > >> as
> > > >>> you have with Flink.
> > > >>>
> > > >>> This different nature of Flink is also why I think that readers
> need
> > > >>> awareness of other readers to do the event-time alignment, and this
> > is
> > > >>> where shared state comes in.
> > > >>>
> > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> fearsome.lucidity@gmail.com>
> > > >>> wrote:
> > > >>>>
> > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fh...@gmail.com>
> > > >> wrote:
> > > >>>>
> > > >>>>> I think the new source interface would be designed to be able to
> > > >>> leverage
> > > >>>>> shared state to achieve time alignment.
> > > >>>>> I don't think this would be possible without some kind of shared
> > > >> state.
> > > >>>>>
> > > >>>>> The problem of tasks that are far ahead in time cannot be solved
> > with
> > > >>>>> back-pressure.
> > > >>>>> That's because a task cannot choose from which source task it
> > accepts
> > > >>>>> events and from which doesn't.
> > > >>>>> If it blocks an input, all downstream tasks that are connected to
> > the
> > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > >>>>> Therefore, all operators need to be able to handle events when
> they
> > > >>> arrive.
> > > >>>>> If they cannot process them yet because they are too far ahead in
> > > >> time,
> > > >>>>> they are put in state.
> > > >>>>>
> > > >>>>
> > > >>>> The idea I was suggesting is not for operators to block an input.
> > > >>> Rather,
> > > >>>> it is that they selectively choose from which input to process the
> > > next
> > > >>>> message from based on their timestamp, so long as there are
> buffered
> > > >>>> messages waiting to be processed.  That is a best-effort alignment
> > > >>>> strategy.  Seems to work relatively well in practice, at least
> > within
> > > >>> Kafka
> > > >>>> Streams.
> > > >>>>
> > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate
> for
> > > >> both
> > > >>>> its inputs.  Instead, it could keep them separate and selectively
> > > >> consume
> > > >>>> from the one that had a buffer available, and if both have buffers
> > > >>>> available, from the buffer with the messages with a lower
> timestamp.
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> > >
> >
>

Re: Sharing state between subtasks

Posted by Till Rohrmann <tr...@apache.org>.
Hi Thomas,

using Akka directly would further manifest our dependency on Scala in
flink-runtime. This is something we are currently trying to get rid of. For
that purpose we have added the RpcService abstraction which encapsulates
all Akka specific logic. We hope that we can soon get rid of the Scala
dependency in flink-runtime by using a special class loader only for
loading the AkkaRpcService implementation.

I think the easiest way to sync the task information is actually going
through the JobMaster because the subtasks don't know on which other TMs
the other subtasks run. Otherwise, we would need to have some TM detection
mechanism between TMs. If you choose this way, then you should be able to
use the RpcService by extending the JobMasterGateway by additional RPCs.

Cheers,
Till

On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <th...@apache.org> wrote:

> Hi,
>
> We are planning to work on the Kinesis consumer in the following order:
>
> 1. Add per shard watermarking:
> https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> already use internally; I will open a PR to add it to the Flink Kinesis
> consumer
> 2. Exchange of per subtask watermarks between all subtasks of one or
> multiple sources
> 3. Implement queue approach described in Jamie's document in to utilize 1.)
> and 2.) to align the shard consumers WRT event time
>
> There was some discussion regarding the mechanism to share the watermarks
> between subtasks. If there is something that can be re-used it would be
> great. Otherwise I'm going to further investigate the Akka or JGroups
> route. Regarding Akka, since it is used within Flink already, is there an
> abstraction that you would recommend to consider to avoid direct
> dependency?
>
> Thanks,
> Thomas
>
>
>
> On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> <wa...@aliyun.com.invalid> wrote:
>
> > Not yet. We only have some initial thoughts and have not worked on it
> yet.
> > We will update the progress in this discussion if have.
> >
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > 发件人:Aljoscha Krettek <al...@apache.org>
> > 发送时间:2018年10月18日(星期四) 17:53
> > 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <
> > wangzhijiang999@aliyun.com>
> > 抄 送:Till Rohrmann <tr...@apache.org>
> > 主 题:Re: Sharing state between subtasks
> >
> > Hi Zhijiang,
> >
> > do you already have working code or a design doc for the second approach?
> >
> > Best,
> > Aljoscha
> >
> > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > wangzhijiang999@aliyun.com.INVALID> wrote:
> > >
> > > Just noticed this discussion from @Till Rohrmann's weekly community
> > update and I want to share some thoughts from our experiences.
> > >
> > > We also encountered the source consuption skew issue before, and we are
> > focused on improving this by two possible ways.
> > >
> > > 1. Control the read strategy by the downstream side. In detail, every
> > input channel in downstream task corresponds to the consumption of one
> > upstream source task, and we will tag each input channel with watermark
> to
> > find the lowest channel to read in high priority. In essence, we actually
> > rely on the mechanism of backpressure. If the channel with highest
> > timestamp is not read by downstream task for a while, it will block the
> > corresponding source task to read when the buffers are exhausted. It is
> no
> > need to change the source interface in this way, but there are two major
> > concerns: first it will affect the barier alignment resulting in
> checkpoint
> > delayed or expired. Second it can not confirm source consumption
> alignment
> > very precisely, and it is just a best effort way. So we gave up this way
> > finally.
> > >
> > > 2. Add the new component of SourceCoordinator to coordinate the source
> > consumption distributedly. For example we can start this componnet in the
> > JobManager like the current role of CheckpointCoordinator. Then every
> > source task would commnicate with JobManager via current RPC mechanism,
> > maybe we can rely on the heartbeat message to attach the consumption
> > progress as the payloads. The JobManagerwill accumulator or state all the
> > reported progress and then give responses for different source tasks. We
> > can define a protocol for indicating the fast soruce task to sleep for
> > specific time for example. To do so, the coordinator has the global
> > informations to give the proper decision for individuals, so it seems
> more
> > precise. And it will not affect the barrier alignment, because the
> sleeping
> > fast source can release the lock to emit barrier as normal. The only
> > concern is the changes for source interface and may affect all related
> > source implementations.
> > >
> > > Currently we prefer to the second way to implement and will refer to
> > other good points above. :)
> > >
> > > Best,
> > > Zhijiang
> > > ------------------------------------------------------------------
> > > 发件人:Jamie Grier <jg...@lyft.com.INVALID>
> > > 发送时间:2018年10月17日(星期三) 03:28
> > > 收件人:dev <de...@flink.apache.org>
> > > 主 题:Re: Sharing state between subtasks
> > >
> > > Here's a doc I started describing some changes we would like to make
> > > starting with the Kinesis Source.. It describes a refactoring of that
> > code
> > > specifically and also hopefully a pattern and some reusable code we can
> > use
> > > in the other sources as well.  The end goal would be best-effort
> > event-time
> > > synchronization across all Flink sources but we are going to start with
> > the
> > > Kinesis Source first.
> > >
> > > Please take a look and please provide thoughts and opinions about the
> > best
> > > state sharing mechanism to use -- that section is left blank and we're
> > > especially looking for input there.
> > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > >
> > > -Jamie
> > >
> > >
> > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <tr...@apache.org>
> > wrote:
> > >
> > >> But on the Kafka source level it should be perfectly fine to do what
> > Elias
> > >> proposed. This is of course is not the perfect solution but could
> bring
> > us
> > >> forward quite a bit. The changes required for this should also be
> > minimal.
> > >> This would become obsolete once we have something like shared state.
> But
> > >> until then, I think it would worth a try.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <aljoscha@apache.org
> >
> > >> wrote:
> > >>
> > >>> The reason this selective reading doesn't work well in Flink in the
> > >> moment
> > >>> is because of checkpointing. For checkpointing, checkpoint barriers
> > >> travel
> > >>> within the streams. If we selectively read from inputs based on
> > >> timestamps
> > >>> this is akin to blocking an input if that input is very far ahead in
> > >> event
> > >>> time, which can happen when you have a very fast source and a slow
> > source
> > >>> (in event time), maybe because you're in a catchup phase. In those
> > cases
> > >>> it's better to simply not read the data at the sources, as Thomas
> said.
> > >>> This is also because with Kafka Streams, each operator is basically
> its
> > >> own
> > >>> job: it's reading from Kafka and writing to Kafka and there is not a
> > >>> complex graph of different operations with network shuffles in
> between,
> > >> as
> > >>> you have with Flink.
> > >>>
> > >>> This different nature of Flink is also why I think that readers need
> > >>> awareness of other readers to do the event-time alignment, and this
> is
> > >>> where shared state comes in.
> > >>>
> > >>>> On 10. Oct 2018, at 20:47, Elias Levy <fe...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fh...@gmail.com>
> > >> wrote:
> > >>>>
> > >>>>> I think the new source interface would be designed to be able to
> > >>> leverage
> > >>>>> shared state to achieve time alignment.
> > >>>>> I don't think this would be possible without some kind of shared
> > >> state.
> > >>>>>
> > >>>>> The problem of tasks that are far ahead in time cannot be solved
> with
> > >>>>> back-pressure.
> > >>>>> That's because a task cannot choose from which source task it
> accepts
> > >>>>> events and from which doesn't.
> > >>>>> If it blocks an input, all downstream tasks that are connected to
> the
> > >>>>> operator are affected. This can easily lead to deadlocks.
> > >>>>> Therefore, all operators need to be able to handle events when they
> > >>> arrive.
> > >>>>> If they cannot process them yet because they are too far ahead in
> > >> time,
> > >>>>> they are put in state.
> > >>>>>
> > >>>>
> > >>>> The idea I was suggesting is not for operators to block an input.
> > >>> Rather,
> > >>>> it is that they selectively choose from which input to process the
> > next
> > >>>> message from based on their timestamp, so long as there are buffered
> > >>>> messages waiting to be processed.  That is a best-effort alignment
> > >>>> strategy.  Seems to work relatively well in practice, at least
> within
> > >>> Kafka
> > >>>> Streams.
> > >>>>
> > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> > >> both
> > >>>> its inputs.  Instead, it could keep them separate and selectively
> > >> consume
> > >>>> from the one that had a buffer available, and if both have buffers
> > >>>> available, from the buffer with the messages with a lower timestamp.
> > >>>
> > >>>
> > >>
> > >
> >
> >
> >
>

Re: Sharing state between subtasks

Posted by Thomas Weise <th...@apache.org>.
Hi,

We are planning to work on the Kinesis consumer in the following order:

1. Add per shard watermarking:
https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
already use internally; I will open a PR to add it to the Flink Kinesis
consumer
2. Exchange of per subtask watermarks between all subtasks of one or
multiple sources
3. Implement queue approach described in Jamie's document in to utilize 1.)
and 2.) to align the shard consumers WRT event time

There was some discussion regarding the mechanism to share the watermarks
between subtasks. If there is something that can be re-used it would be
great. Otherwise I'm going to further investigate the Akka or JGroups
route. Regarding Akka, since it is used within Flink already, is there an
abstraction that you would recommend to consider to avoid direct dependency?

Thanks,
Thomas



On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
<wa...@aliyun.com.invalid> wrote:

> Not yet. We only have some initial thoughts and have not worked on it yet.
> We will update the progress in this discussion if have.
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Aljoscha Krettek <al...@apache.org>
> 发送时间:2018年10月18日(星期四) 17:53
> 收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com>
> 抄 送:Till Rohrmann <tr...@apache.org>
> 主 题:Re: Sharing state between subtasks
>
> Hi Zhijiang,
>
> do you already have working code or a design doc for the second approach?
>
> Best,
> Aljoscha
>
> > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com.INVALID> wrote:
> >
> > Just noticed this discussion from @Till Rohrmann's weekly community
> update and I want to share some thoughts from our experiences.
> >
> > We also encountered the source consuption skew issue before, and we are
> focused on improving this by two possible ways.
> >
> > 1. Control the read strategy by the downstream side. In detail, every
> input channel in downstream task corresponds to the consumption of one
> upstream source task, and we will tag each input channel with watermark to
> find the lowest channel to read in high priority. In essence, we actually
> rely on the mechanism of backpressure. If the channel with highest
> timestamp is not read by downstream task for a while, it will block the
> corresponding source task to read when the buffers are exhausted. It is no
> need to change the source interface in this way, but there are two major
> concerns: first it will affect the barier alignment resulting in checkpoint
> delayed or expired. Second it can not confirm source consumption alignment
> very precisely, and it is just a best effort way. So we gave up this way
> finally.
> >
> > 2. Add the new component of SourceCoordinator to coordinate the source
> consumption distributedly. For example we can start this componnet in the
> JobManager like the current role of CheckpointCoordinator. Then every
> source task would commnicate with JobManager via current RPC mechanism,
> maybe we can rely on the heartbeat message to attach the consumption
> progress as the payloads. The JobManagerwill accumulator or state all the
> reported progress and then give responses for different source tasks. We
> can define a protocol for indicating the fast soruce task to sleep for
> specific time for example. To do so, the coordinator has the global
> informations to give the proper decision for individuals, so it seems more
> precise. And it will not affect the barrier alignment, because the sleeping
> fast source can release the lock to emit barrier as normal. The only
> concern is the changes for source interface and may affect all related
> source implementations.
> >
> > Currently we prefer to the second way to implement and will refer to
> other good points above. :)
> >
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > 发件人:Jamie Grier <jg...@lyft.com.INVALID>
> > 发送时间:2018年10月17日(星期三) 03:28
> > 收件人:dev <de...@flink.apache.org>
> > 主 题:Re: Sharing state between subtasks
> >
> > Here's a doc I started describing some changes we would like to make
> > starting with the Kinesis Source.. It describes a refactoring of that
> code
> > specifically and also hopefully a pattern and some reusable code we can
> use
> > in the other sources as well.  The end goal would be best-effort
> event-time
> > synchronization across all Flink sources but we are going to start with
> the
> > Kinesis Source first.
> >
> > Please take a look and please provide thoughts and opinions about the
> best
> > state sharing mechanism to use -- that section is left blank and we're
> > especially looking for input there.
> >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> >
> > -Jamie
> >
> >
> > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <tr...@apache.org>
> wrote:
> >
> >> But on the Kafka source level it should be perfectly fine to do what
> Elias
> >> proposed. This is of course is not the perfect solution but could bring
> us
> >> forward quite a bit. The changes required for this should also be
> minimal.
> >> This would become obsolete once we have something like shared state. But
> >> until then, I think it would worth a try.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <al...@apache.org>
> >> wrote:
> >>
> >>> The reason this selective reading doesn't work well in Flink in the
> >> moment
> >>> is because of checkpointing. For checkpointing, checkpoint barriers
> >> travel
> >>> within the streams. If we selectively read from inputs based on
> >> timestamps
> >>> this is akin to blocking an input if that input is very far ahead in
> >> event
> >>> time, which can happen when you have a very fast source and a slow
> source
> >>> (in event time), maybe because you're in a catchup phase. In those
> cases
> >>> it's better to simply not read the data at the sources, as Thomas said.
> >>> This is also because with Kafka Streams, each operator is basically its
> >> own
> >>> job: it's reading from Kafka and writing to Kafka and there is not a
> >>> complex graph of different operations with network shuffles in between,
> >> as
> >>> you have with Flink.
> >>>
> >>> This different nature of Flink is also why I think that readers need
> >>> awareness of other readers to do the event-time alignment, and this is
> >>> where shared state comes in.
> >>>
> >>>> On 10. Oct 2018, at 20:47, Elias Levy <fe...@gmail.com>
> >>> wrote:
> >>>>
> >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fh...@gmail.com>
> >> wrote:
> >>>>
> >>>>> I think the new source interface would be designed to be able to
> >>> leverage
> >>>>> shared state to achieve time alignment.
> >>>>> I don't think this would be possible without some kind of shared
> >> state.
> >>>>>
> >>>>> The problem of tasks that are far ahead in time cannot be solved with
> >>>>> back-pressure.
> >>>>> That's because a task cannot choose from which source task it accepts
> >>>>> events and from which doesn't.
> >>>>> If it blocks an input, all downstream tasks that are connected to the
> >>>>> operator are affected. This can easily lead to deadlocks.
> >>>>> Therefore, all operators need to be able to handle events when they
> >>> arrive.
> >>>>> If they cannot process them yet because they are too far ahead in
> >> time,
> >>>>> they are put in state.
> >>>>>
> >>>>
> >>>> The idea I was suggesting is not for operators to block an input.
> >>> Rather,
> >>>> it is that they selectively choose from which input to process the
> next
> >>>> message from based on their timestamp, so long as there are buffered
> >>>> messages waiting to be processed.  That is a best-effort alignment
> >>>> strategy.  Seems to work relatively well in practice, at least within
> >>> Kafka
> >>>> Streams.
> >>>>
> >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> >> both
> >>>> its inputs.  Instead, it could keep them separate and selectively
> >> consume
> >>>> from the one that had a buffer available, and if both have buffers
> >>>> available, from the buffer with the messages with a lower timestamp.
> >>>
> >>>
> >>
> >
>
>
>

回复:Sharing state between subtasks

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com.INVALID>.
Not yet. We only have some initial thoughts and have not worked on it yet. We will update the progress in this discussion if have.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Aljoscha Krettek <al...@apache.org>
发送时间:2018年10月18日(星期四) 17:53
收件人:dev <de...@flink.apache.org>; Zhijiang(wangzhijiang999) <wa...@aliyun.com>
抄 送:Till Rohrmann <tr...@apache.org>
主 题:Re: Sharing state between subtasks

Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> 
> Just noticed this discussion from @Till Rohrmann's weekly community update and I want to share some thoughts from our experiences.
> 
> We also encountered the source consuption skew issue before, and we are focused on improving this by two possible ways.
> 
> 1. Control the read strategy by the downstream side. In detail, every input channel in downstream task corresponds to the consumption of one upstream source task, and we will tag each input channel with watermark to find the lowest channel to read in high priority. In essence, we actually rely on the mechanism of backpressure. If the channel with highest timestamp is not read by downstream task for a while, it will block the corresponding source task to read when the buffers are exhausted. It is no need to change the source interface in this way, but there are two major concerns: first it will affect the barier alignment resulting in checkpoint delayed or expired. Second it can not confirm source consumption alignment very precisely, and it is just a best effort way. So we gave up this way finally.
> 
> 2. Add the new component of SourceCoordinator to coordinate the source consumption distributedly. For example we can start this componnet in the JobManager like the current role of CheckpointCoordinator. Then every source task would commnicate with JobManager via current RPC mechanism, maybe we can rely on the heartbeat message to attach the consumption progress as the payloads. The JobManagerwill accumulator or state all the reported progress and then give responses for different source tasks. We can define a protocol for indicating the fast soruce task to sleep for specific time for example. To do so, the coordinator has the global informations to give the proper decision for individuals, so it seems more precise. And it will not affect the barrier alignment, because the sleeping fast source can release the lock to emit barrier as normal. The only concern is the changes for source interface and may affect all related source implementations.
> 
> Currently we prefer to the second way to implement and will refer to other good points above. :)
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Jamie Grier <jg...@lyft.com.INVALID>
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev <de...@flink.apache.org>
> 主 题:Re: Sharing state between subtasks
> 
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
> 
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
> 
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> 
> -Jamie
> 
> 
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <tr...@apache.org> wrote:
> 
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>> 
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those cases
>>> it's better to simply not read the data at the sources, as Thomas said.
>>> This is also because with Kafka Streams, each operator is basically its
>> own
>>> job: it's reading from Kafka and writing to Kafka and there is not a
>>> complex graph of different operations with network shuffles in between,
>> as
>>> you have with Flink.
>>> 
>>> This different nature of Flink is also why I think that readers need
>>> awareness of other readers to do the event-time alignment, and this is
>>> where shared state comes in.
>>> 
>>>> On 10. Oct 2018, at 20:47, Elias Levy <fe...@gmail.com>
>>> wrote:
>>>> 
>>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fh...@gmail.com>
>> wrote:
>>>> 
>>>>> I think the new source interface would be designed to be able to
>>> leverage
>>>>> shared state to achieve time alignment.
>>>>> I don't think this would be possible without some kind of shared
>> state.
>>>>> 
>>>>> The problem of tasks that are far ahead in time cannot be solved with
>>>>> back-pressure.
>>>>> That's because a task cannot choose from which source task it accepts
>>>>> events and from which doesn't.
>>>>> If it blocks an input, all downstream tasks that are connected to the
>>>>> operator are affected. This can easily lead to deadlocks.
>>>>> Therefore, all operators need to be able to handle events when they
>>> arrive.
>>>>> If they cannot process them yet because they are too far ahead in
>> time,
>>>>> they are put in state.
>>>>> 
>>>> 
>>>> The idea I was suggesting is not for operators to block an input.
>>> Rather,
>>>> it is that they selectively choose from which input to process the next
>>>> message from based on their timestamp, so long as there are buffered
>>>> messages waiting to be processed.  That is a best-effort alignment
>>>> strategy.  Seems to work relatively well in practice, at least within
>>> Kafka
>>>> Streams.
>>>> 
>>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
>> both
>>>> its inputs.  Instead, it could keep them separate and selectively
>> consume
>>>> from the one that had a buffer available, and if both have buffers
>>>> available, from the buffer with the messages with a lower timestamp.
>>> 
>>> 
>> 
> 



Re: Sharing state between subtasks

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <wa...@aliyun.com.INVALID> wrote:
> 
> Just noticed this discussion from @Till Rohrmann's weekly community update and I want to share some thoughts from our experiences.
> 
> We also encountered the source consuption skew issue before, and we are focused on improving this by two possible ways.
> 
> 1. Control the read strategy by the downstream side. In detail, every input channel in downstream task corresponds to the consumption of one upstream source task, and we will tag each input channel with watermark to find the lowest channel to read in high priority. In essence, we actually rely on the mechanism of backpressure. If the channel with highest timestamp is not read by downstream task for a while, it will block the corresponding source task to read when the buffers are exhausted. It is no need to change the source interface in this way, but there are two major concerns: first it will affect the barier alignment resulting in checkpoint delayed or expired. Second it can not confirm source consumption alignment very precisely, and it is just a best effort way. So we gave up this way finally.
> 
> 2. Add the new component of SourceCoordinator to coordinate the source consumption distributedly. For example we can start this componnet in the JobManager like the current role of CheckpointCoordinator. Then every source task would commnicate with JobManager via current RPC mechanism, maybe we can rely on the heartbeat message to attach the consumption progress as the payloads. The JobManagerwill accumulator or state all the reported progress and then give responses for different source tasks. We can define a protocol for indicating the fast soruce task to sleep for specific time for example. To do so, the coordinator has the global informations to give the proper decision for individuals, so it seems more precise. And it will not affect the barrier alignment, because the sleeping fast source can release the lock to emit barrier as normal. The only concern is the changes for source interface and may affect all related source implementations.
> 
> Currently we prefer to the second way to implement and will refer to other good points above. :)
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Jamie Grier <jg...@lyft.com.INVALID>
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev <de...@flink.apache.org>
> 主 题:Re: Sharing state between subtasks
> 
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
> 
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
> 
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> 
> -Jamie
> 
> 
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <tr...@apache.org> wrote:
> 
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <al...@apache.org>
>> wrote:
>> 
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those cases
>>> it's better to simply not read the data at the sources, as Thomas said.
>>> This is also because with Kafka Streams, each operator is basically its
>> own
>>> job: it's reading from Kafka and writing to Kafka and there is not a
>>> complex graph of different operations with network shuffles in between,
>> as
>>> you have with Flink.
>>> 
>>> This different nature of Flink is also why I think that readers need
>>> awareness of other readers to do the event-time alignment, and this is
>>> where shared state comes in.
>>> 
>>>> On 10. Oct 2018, at 20:47, Elias Levy <fe...@gmail.com>
>>> wrote:
>>>> 
>>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <fh...@gmail.com>
>> wrote:
>>>> 
>>>>> I think the new source interface would be designed to be able to
>>> leverage
>>>>> shared state to achieve time alignment.
>>>>> I don't think this would be possible without some kind of shared
>> state.
>>>>> 
>>>>> The problem of tasks that are far ahead in time cannot be solved with
>>>>> back-pressure.
>>>>> That's because a task cannot choose from which source task it accepts
>>>>> events and from which doesn't.
>>>>> If it blocks an input, all downstream tasks that are connected to the
>>>>> operator are affected. This can easily lead to deadlocks.
>>>>> Therefore, all operators need to be able to handle events when they
>>> arrive.
>>>>> If they cannot process them yet because they are too far ahead in
>> time,
>>>>> they are put in state.
>>>>> 
>>>> 
>>>> The idea I was suggesting is not for operators to block an input.
>>> Rather,
>>>> it is that they selectively choose from which input to process the next
>>>> message from based on their timestamp, so long as there are buffered
>>>> messages waiting to be processed.  That is a best-effort alignment
>>>> strategy.  Seems to work relatively well in practice, at least within
>>> Kafka
>>>> Streams.
>>>> 
>>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
>> both
>>>> its inputs.  Instead, it could keep them separate and selectively
>> consume
>>>> from the one that had a buffer available, and if both have buffers
>>>> available, from the buffer with the messages with a lower timestamp.
>>> 
>>> 
>> 
>