You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Martin Eden <ma...@gmail.com> on 2017/05/08 05:08:39 UTC

Re: Writing a reliable Flink source for a NON-replay-able queue/protocol that supports message ACKs

Hi Kostas,

Thanks for pointing me in the right direction.

I have gone and extended MessageAcknowledgingSourceBase. It was quite easy
to do.

I have however some follow-up questions about the guarantees it gives
and testing my solution.

1. Guarantees:

*Questions:*
a. When the acknowledgeIDs method is called, is it certain that all the
rest of the operators, including the Sinks finished successfully? E.g: If I
have a sink that writes to MySQL/Cassandra and one that writes to
SQS/Kafka, will the writes to both of these systems have been completed
successfully before acknowledgeIDs is called?
b. Messages can be duplicated in case the processing takes longer than the
queue timeout or if there are failures and Flink needs to recover. This is
a problem for sinks that write to non-idempotent systems e.g. SQS, Kafka.
What are the recommended approaches to avoid this duplication? Are there
any example repos?

2. Testing:

*Work done so far:*
In order to convince myself that indeed this source is reliable, I wrote
some integration tests. I used LocalFlinkMiniCluster which is quite nice.
However when I tried to test what happens when I kill the TaskManager
running the task that is executing my MessageAcknowledgingSourceBase I
found it not so straight forward. I managed to get around it by starting
the cluster and the job, getting all the task manager actor references,
adding a new task manager to the cluster, killing the initial task managers
by sending a poison pill actor msg. I had to kill all the initial task
managers as I did not find a way to get a mapping between task running the
source and the task manager actor to which it got assigned.

*Questions:*
a. Is there some better api for fine grained killing of various services,
tasks, resources in a Flink cluster or job?
b. Can you point me to a repo with reliability tests for Flink i.e. where
things are killed to see whether the system recovers etc?

Thanks,
M


On Tue, Apr 25, 2017 at 9:23 AM, Kostas Kloudas <k.kloudas@data-artisans.com
> wrote:

> Hi Martin!
>
> For an example of a source that acknowledges received messages, you could
> check the MessageAcknowledgingSourceBase
> and the MultipleIdsMessageAcknowledgingSourceBase that ship with Flink. I
> hope this will give you some ideas.
>
> Now for the Flink version on top of which to implement your source, I
> would suggest the Flink 1.3. The reason is that it will
> come out soon (~1 month) and it will include a lot of new features and
> bug-fixes. Until then, it may change a bit, but the APIs
> that you will be using, will not change.
>
> So why not going straight for the more future-proof way?
>
> Thanks,
> Kostas
>
> > On Apr 24, 2017, at 11:20 PM, Martin Eden <ma...@gmail.com>
> wrote:
> >
> > Hi everyone,
> >
> > Are there any examples of how to implement a reliable (zero data loss)
> Flink source reading from a system that is not replay-able but supports
> acknowledging messages?
> >
> > Or any pointers of how one can achieve this and how Flink can help?
> >
> > I imagine it should involve a write ahead log but not yet clear of how
> to implement it and how to integrate with the Flink fault tolerance
> mechanism. Can Flink maintain the write ahead log for me?
> >
> > Also, does it make sense to start implementing this in the current
> stable Flink release 1.2 or is there any advantage in implementing it
> directly in Flink 1.3 since it is coming up soon anyway?
> >
> > Thanks,
> > M
>
>

Re: Writing a reliable Flink source for a NON-replay-able queue/protocol that supports message ACKs

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Martin!

Let me try to follow-up some of your questions :)

a. When the acknowledgeIDs method is called, is it certain that all the rest of the operators, including the Sinks finished successfully? E.g: If I have a sink that writes to MySQL/Cassandra and one that writes to SQS/Kafka, will the writes to both of these systems have been completed successfully before acknowledgeIDs is called?
That is correct. The `MessageAcknowledgingSourceBase#acknowledgeIDs` is basically wrapped within a `notifyCheckpointComplete()` call. Checkpoints are only notified to be completed when all sinks have finished their snapshot for the checkpoint.

For sinks like Cassandra and Kafka, currently what the snapshot method does is flush all in-flight pending requests to write to the external system. So yes, you can be sure that the writes for the acknowledged IDs have been completed by all sinks.

b. Messages can be duplicated in case the processing takes longer than the queue timeout or if there are failures and Flink needs to recover. This is a problem for sinks that write to non-idempotent systems e.g. SQS, Kafka. What are the recommended approaches to avoid this duplication? Are there any example repos?
Duplication is a general problem for non-idempotent pipeline setups, and is not easy to avoid when the external sink does not support transactions. I’m not aware of any cookbook solutions for this.

a. Is there some better api for fine grained killing of various services, tasks, resources in a Flink cluster or job?
b. Can you point me to a repo with reliability tests for Flink i.e. where things are killed to see whether the system recovers etc?
The Flink Kafka consumer actually has quite a few tests for exactly-once guarantees. You can take a look at them here [1]. Specifically, take a look at the `testOneToOneSources`, `testOneSourceMultiplePartitions`, etc. tests. I think they are quite good examples of how to write tests for exactly-once testing.



- Gordon

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java



On 8 May 2017 at 1:08:44 PM, Martin Eden (martineden131@gmail.com) wrote:

Hi Kostas,

Thanks for pointing me in the right direction.

I have gone and extended MessageAcknowledgingSourceBase. It was quite easy to do.

I have however some follow-up questions about the guarantees it gives and testing my solution.

1. Guarantees:

Questions:
a. When the acknowledgeIDs method is called, is it certain that all the rest of the operators, including the Sinks finished successfully? E.g: If I have a sink that writes to MySQL/Cassandra and one that writes to SQS/Kafka, will the writes to both of these systems have been completed successfully before acknowledgeIDs is called?
b. Messages can be duplicated in case the processing takes longer than the queue timeout or if there are failures and Flink needs to recover. This is a problem for sinks that write to non-idempotent systems e.g. SQS, Kafka. What are the recommended approaches to avoid this duplication? Are there any example repos?

2. Testing:

Work done so far:
In order to convince myself that indeed this source is reliable, I wrote some integration tests. I used LocalFlinkMiniCluster which is quite nice. However when I tried to test what happens when I kill the TaskManager running the task that is executing my MessageAcknowledgingSourceBase I found it not so straight forward. I managed to get around it by starting the cluster and the job, getting all the task manager actor references, adding a new task manager to the cluster, killing the initial task managers by sending a poison pill actor msg. I had to kill all the initial task managers as I did not find a way to get a mapping between task running the source and the task manager actor to which it got assigned.

Questions:
a. Is there some better api for fine grained killing of various services, tasks, resources in a Flink cluster or job?
b. Can you point me to a repo with reliability tests for Flink i.e. where things are killed to see whether the system recovers etc?

Thanks,
M


On Tue, Apr 25, 2017 at 9:23 AM, Kostas Kloudas <k....@data-artisans.com> wrote:
Hi Martin!

For an example of a source that acknowledges received messages, you could check the MessageAcknowledgingSourceBase
and the MultipleIdsMessageAcknowledgingSourceBase that ship with Flink. I hope this will give you some ideas.

Now for the Flink version on top of which to implement your source, I would suggest the Flink 1.3. The reason is that it will
come out soon (~1 month) and it will include a lot of new features and bug-fixes. Until then, it may change a bit, but the APIs
that you will be using, will not change.

So why not going straight for the more future-proof way?

Thanks,
Kostas

> On Apr 24, 2017, at 11:20 PM, Martin Eden <ma...@gmail.com> wrote:
>
> Hi everyone,
>
> Are there any examples of how to implement a reliable (zero data loss) Flink source reading from a system that is not replay-able but supports acknowledging messages?
>
> Or any pointers of how one can achieve this and how Flink can help?
>
> I imagine it should involve a write ahead log but not yet clear of how to implement it and how to integrate with the Flink fault tolerance mechanism. Can Flink maintain the write ahead log for me?
>
> Also, does it make sense to start implementing this in the current stable Flink release 1.2 or is there any advantage in implementing it directly in Flink 1.3 since it is coming up soon anyway?
>
> Thanks,
> M