You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by ming li <jo...@gmail.com> on 2023/02/27 12:34:42 UTC

[DISCUSS] SourceCoordinator and ExternallyInducedSourceReader do not work well together

Hi, dev,

We recently used *SourceCoordinator* and *ExternallyInducedSource* to work
together on some file type connectors to fulfill some requirements, but we
found that these two interfaces do not work well together.

*SourceCoordinator* (FLINK-15101) and *ExternallyInducedSource*
(FLINK-20270) were introduced in Flip27. *SourceCoordinator* is responsible
for running *SplitEnumerator* and coordinating the allocation of *Split*.
*ExternallyInducedSource* allows us to delay making a c*heckpoint* in
Source or make a c*heckpoint* at specified data. This works fine with
connectors like *Kafka*.

But in some connectors (such as hive connector), the split is completely
allocated by the *SourceCoordinator*, and after the consumption is
completed, it needs to wait for the allocation of the next batch of splits
(it is not like kafka that continuously consumes the same split). In
FLINK-28606, we introduced another mechanism: the *OperatorCoordinator* is
not allowed to send *OperatorEvents* to the *Operator* before the
*Operator's* checkpoint is completed.

Considering this scenario, if the data we want has not been produced yet,
but the *SourceCoordinator* receives the c*heckpoint* message, it will
directly make a *checkpoint*, and the *ExternallyInducedSource* will not
make a *checkpoint* immediately after receiving the *checkpoint*, but
continues to wait for a new split. Even if a new split is generated, due to
the behavior of closing *gateway* in *FLINK-28606*, the new split cannot be
assigned to the *Source*, resulting in a deadlock (or forced to wait for
checkpoint to time out).

So should we also add a mechanism similar to *ExternallyInducedSource* in
*OperatorCoordinator*: only make a checkpoint on *OperatorCoordinator* when
*OperatorCoordinator* is ready, which allows us to delay making checkpoint?

[1] https://issues.apache.org/jira/browse/FLINK-15101
[2] https://issues.apache.org/jira/browse/FLINK-20270
[3] https://issues.apache.org/jira/browse/FLINK-28606

Re: [DISCUSS] SourceCoordinator and ExternallyInducedSourceReader do not work well together

Posted by ming li <jo...@gmail.com>.
Hi, Jiangjie,

Thanks for your reply and suggestion.

In fact, we don't want to modify the way JM triggers checkpoint, but we
hope to give OperatorCoodinator a mechanism similar to
ExternallyInducedSourceReader to coordinate the sending timing of
checkpoint barrier (just advance from Source to OperatorCoodinator). We
hope that the produced data and Checkpoint have a one-to-one mapping. If
there is such a mechanism, the difficulty of programming and design can be
greatly simplified.

In addition, I am not sure if there is the same need in other
OperatorCoordinator, because we always make a snapshot of
OperatorCoordinator immediately.

Thanks,
Ming Li


Becket Qin <be...@gmail.com> 于2023年2月28日周二 08:31写道:

> Hi Ming,
>
> I am not sure if I fully understand what you want. It seems what you are
> looking for is to have a checkpoint triggered at a customized timing which
> aligns with some semantic. This is not what the current checkpoint in Flink
> was designed for. I think the basic idea of checkpoint is to just take a
> snapshot of the current state, so we can restore to that state in case of
> failure. This is completely orthogonal to the data semantic.
>
> Even with the ExternallyInducedSourceReader, the checkpoint is still
> triggered by the JM. It is just the effective checkpoint barrier message (a
> custom message in this case) will not be sent by the JM, but by the
> external source storage. This helps when the external source storage needs
> its own internal state to be aligned with the state of the Flink
> SourceReader. For example, if the external source storage can only seek at
> some bulk boundary, then it might wait until the current bulk to finish
> before it sends the custom checkpoint barrier to the SourceReader.
>
>  Considering this scenario, if the data we want has not been produced yet,
> > but the *SourceCoordinator* receives the c*heckpoint* message, it will
> > directly make a *checkpoint*, and the *ExternallyInducedSource* will not
> > make a *checkpoint* immediately after receiving the *checkpoint*, but
> > continues to wait for a new split. Even if a new split is generated, due
> to
> > the behavior of closing *gateway* in *FLINK-28606*, the new split cannot
> be
> > assigned to the *Source*, resulting in a deadlock (or forced to wait for
> > checkpoint to time out).
>
>
> In this case, the source reader should not "wait" for the splits that are
> not included in this checkpoint. These splits should be a part of the next
> checkpoint. It would be the Sink's responsibility to ensure the output is
> committed in a way that aligns with the user semantic.
>
> That said, I agree it might be useful in some cases if users can decided
> the checkpoint triggering timing. But that will be a new feature which
> needs some careful design.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>

Re: [DISCUSS] SourceCoordinator and ExternallyInducedSourceReader do not work well together

Posted by Becket Qin <be...@gmail.com>.
Hi Ming,

I am not sure if I fully understand what you want. It seems what you are
looking for is to have a checkpoint triggered at a customized timing which
aligns with some semantic. This is not what the current checkpoint in Flink
was designed for. I think the basic idea of checkpoint is to just take a
snapshot of the current state, so we can restore to that state in case of
failure. This is completely orthogonal to the data semantic.

Even with the ExternallyInducedSourceReader, the checkpoint is still
triggered by the JM. It is just the effective checkpoint barrier message (a
custom message in this case) will not be sent by the JM, but by the
external source storage. This helps when the external source storage needs
its own internal state to be aligned with the state of the Flink
SourceReader. For example, if the external source storage can only seek at
some bulk boundary, then it might wait until the current bulk to finish
before it sends the custom checkpoint barrier to the SourceReader.

 Considering this scenario, if the data we want has not been produced yet,
> but the *SourceCoordinator* receives the c*heckpoint* message, it will
> directly make a *checkpoint*, and the *ExternallyInducedSource* will not
> make a *checkpoint* immediately after receiving the *checkpoint*, but
> continues to wait for a new split. Even if a new split is generated, due to
> the behavior of closing *gateway* in *FLINK-28606*, the new split cannot be
> assigned to the *Source*, resulting in a deadlock (or forced to wait for
> checkpoint to time out).


In this case, the source reader should not "wait" for the splits that are
not included in this checkpoint. These splits should be a part of the next
checkpoint. It would be the Sink's responsibility to ensure the output is
committed in a way that aligns with the user semantic.

That said, I agree it might be useful in some cases if users can decided
the checkpoint triggering timing. But that will be a new feature which
needs some careful design.

Thanks,

Jiangjie (Becket) Qin


On Mon, Feb 27, 2023 at 8:35 PM ming li <jo...@gmail.com> wrote:

> Hi, dev,
>
> We recently used *SourceCoordinator* and *ExternallyInducedSource* to work
> together on some file type connectors to fulfill some requirements, but we
> found that these two interfaces do not work well together.
>
> *SourceCoordinator* (FLINK-15101) and *ExternallyInducedSource*
> (FLINK-20270) were introduced in Flip27. *SourceCoordinator* is responsible
> for running *SplitEnumerator* and coordinating the allocation of *Split*.
> *ExternallyInducedSource* allows us to delay making a c*heckpoint* in
> Source or make a c*heckpoint* at specified data. This works fine with
> connectors like *Kafka*.
>
> But in some connectors (such as hive connector), the split is completely
> allocated by the *SourceCoordinator*, and after the consumption is
> completed, it needs to wait for the allocation of the next batch of splits
> (it is not like kafka that continuously consumes the same split). In
> FLINK-28606, we introduced another mechanism: the *OperatorCoordinator* is
> not allowed to send *OperatorEvents* to the *Operator* before the
> *Operator's* checkpoint is completed.
>
> Considering this scenario, if the data we want has not been produced yet,
> but the *SourceCoordinator* receives the c*heckpoint* message, it will
> directly make a *checkpoint*, and the *ExternallyInducedSource* will not
> make a *checkpoint* immediately after receiving the *checkpoint*, but
> continues to wait for a new split. Even if a new split is generated, due to
> the behavior of closing *gateway* in *FLINK-28606*, the new split cannot be
> assigned to the *Source*, resulting in a deadlock (or forced to wait for
> checkpoint to time out).
>
> So should we also add a mechanism similar to *ExternallyInducedSource* in
> *OperatorCoordinator*: only make a checkpoint on *OperatorCoordinator* when
> *OperatorCoordinator* is ready, which allows us to delay making checkpoint?
>
> [1] https://issues.apache.org/jira/browse/FLINK-15101
> [2] https://issues.apache.org/jira/browse/FLINK-20270
> [3] https://issues.apache.org/jira/browse/FLINK-28606
>