You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ning Shi <ni...@gmail.com> on 2018/10/21 23:26:34 UTC

Clean shutdown of streaming job

I'm implementing a streaming job that consumes events from Kafka and
writes results to Cassandra. The writes to Cassandra are not
idempotent. In preparation for planned maintenance events like Flink
version upgrade or job update, I'd like to be able to shutdown the job
cleanly.

I understand that cancelling with savepoint is currently not an atomic
operation, meaning that there may be one or more events being processed
after the savepoint is taken. In order to prevent any writes going to
Cassandra after the savepoint is taken, I wonder if it's possible to
pause the Kafka stream before I take the savepoint.

I've seen people suggest using a control stream and union it with the
Kafka stream to achieve this, but it doesn't really pause the Kafka's
consumer offset from advancing. I'm concerned that if I send the signal
to the control stream and start to drop messages from Kafka, the offsets
may still advance and the new offsets will be included in the
savepoint. As a result, recovering from the savepoint will cause data
loss.

Is there anyway to cleanly shutdown a job or pause the Kafka source
prior to taking a savepoint?

Thanks,

--
Ning

Re: Clean shutdown of streaming job

Posted by Ning Shi <ni...@gmail.com>.
Hi Neils,

Thanks for the response.

> I think your problem is that the Cassandra sink doesn't support exactly
> once guarantees when the Cassandra query isn't idempotent. If possible, the
> cleanest solution would be implementing a new or extending the existing
> Cassandra sink with the
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
> interface, and setting your environment to exactly-once guarantee.

You are right. The culprit is that the Cassandra queries are not
idempotent.

I did consider implementing a custom sink that implements the two phase
commit sink function. However, working with an external system that
doesn't ahve any transaction support is non-trivial. We have to build
undo logs and roll it back from the application side in case the
transaction aborts.

That was what led me to think that pausing the Kafka stream might be the
simplest and cleanest solution here. It doesn't mandate that the sink
has to be exactly once and still provide a clean shutdown approach,
which may have broader applications.

--
Ning

Re: Clean shutdown of streaming job

Posted by Niels van Kaam <ni...@vankaam.net>.
Hi Ning,

I don't think it is possible to pause a Kafka source upon taking a
savepoint without making any changes to the implementation.

I think your problem is that the Cassandra sink doesn't support exactly
once guarantees when the Cassandra query isn't idempotent. If possible, the
cleanest solution would be implementing a new or extending the existing
Cassandra sink with the
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/index.html?org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
interface, and setting your environment to exactly-once guarantee.

--
Niels



On Mon, Oct 22, 2018 at 1:26 AM Ning Shi <ni...@gmail.com> wrote:

> I'm implementing a streaming job that consumes events from Kafka and
> writes results to Cassandra. The writes to Cassandra are not
> idempotent. In preparation for planned maintenance events like Flink
> version upgrade or job update, I'd like to be able to shutdown the job
> cleanly.
>
> I understand that cancelling with savepoint is currently not an atomic
> operation, meaning that there may be one or more events being processed
> after the savepoint is taken. In order to prevent any writes going to
> Cassandra after the savepoint is taken, I wonder if it's possible to
> pause the Kafka stream before I take the savepoint.
>
> I've seen people suggest using a control stream and union it with the
> Kafka stream to achieve this, but it doesn't really pause the Kafka's
> consumer offset from advancing. I'm concerned that if I send the signal
> to the control stream and start to drop messages from Kafka, the offsets
> may still advance and the new offsets will be included in the
> savepoint. As a result, recovering from the savepoint will cause data
> loss.
>
> Is there anyway to cleanly shutdown a job or pause the Kafka source
> prior to taking a savepoint?
>
> Thanks,
>
> --
> Ning
>