You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sofer, Tovi " <to...@citi.com> on 2018/02/06 14:31:16 UTC

kafka as recovery only source

Hi group,

I wanted to get your suggestion on how to implement two requirements we have:

*         One is to read from external message queue (JMS) at very fast latency

*         Second is to support zero data loss, so that in case of restart and recovery, messages not checkpointed (and not part of state) will be replayed again.

(which indicates kind of replayble source)

Because of the first requirement we can't write JMS messages to Kafka first and only then read from kafka, because it will increase latency.
Instead we thought to consume the JMS messages and forward them both to job and to KafkaSink.
Then in case of failure and recovery, we want to start in recovery mode, and read message from offset matching the state\checkpoint.
How can this be done? We though to somehow save in the state the last flushed kakfa offset.
The problem is this information is available only via future\interceptor and we don't know how to connect it to state, so RecoverySource can use it...

So current suggestion looks something like:

Happy path:
JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators), KafkaSink
(Here maybe we can add ProducerInterceptor-> which saves offset to state somehow)

Failure path: (will run before HappyPath to recover data)
RecoverySource-> JobMessageParser(and additional operators)
(Here maybe add Queryable state client which reads offsets from other operator state)

Thanks,
Tovi


RE: kafka as recovery only source

Posted by "Sofer, Tovi " <to...@citi.com>.
Hi Fabian,

Thank you for the suggestion. We will consider it.
Would be glad to hear other ideas how to handle such requirement.

Thanks again,
Tovi
From: Fabian Hueske [mailto:fhueske@gmail.com]
Sent: יום ד 07 פברואר 2018 11:47
To: Sofer, Tovi [ICG-IT] <ts...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org; Tzu-Li (Gordon) Tai <tz...@apache.org>
Subject: Re: kafka as recovery only source

Hi Tovi,
I've been thinking about this idea.
It might be possible, but I think you have to implement a custom source for this.

I don't think it would work to have the JMSConsumer, KafkaSink, and RecoverySource in separate operators because otherwise it would not be possible to share the Kafka write offset of the recovery topic at checkpoints.
QueryableState as you suggested only works for keyed state which is (AFAIK) not available for sources.
The custom source operator would consume from JMS and directly write all records to Kafka. In case of a recovery, it starts reading from Kafka and continues with JMS once the recovery topic has been completely consumed.
If you run the source in parallel, you need to handle the partitions of Kafka recovery topic.

I'm adding Gordon to this thread who might have additional comments or ideas.
Best, Fabian


2018-02-06 15:31 GMT+01:00 Sofer, Tovi <to...@citi.com>>:
Hi group,

I wanted to get your suggestion on how to implement two requirements we have:

•         One is to read from external message queue (JMS) at very fast latency

•         Second is to support zero data loss, so that in case of restart and recovery, messages not checkpointed (and not part of state) will be replayed again.

(which indicates kind of replayble source)

Because of the first requirement we can’t write JMS messages to Kafka first and only then read from kafka, because it will increase latency.
Instead we thought to consume the JMS messages and forward them both to job and to KafkaSink.
Then in case of failure and recovery, we want to start in recovery mode, and read message from offset matching the state\checkpoint.
How can this be done? We though to somehow save in the state the last flushed kakfa offset.
The problem is this information is available only via future\interceptor and we don’t know how to connect it to state, so RecoverySource can use it…

So current suggestion looks something like:

Happy path:
JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators), KafkaSink
(Here maybe we can add ProducerInterceptor-> which saves offset to state somehow)

Failure path: (will run before HappyPath to recover data)
RecoverySource-> JobMessageParser(and additional operators)
(Here maybe add Queryable state client which reads offsets from other operator state)

Thanks,
Tovi



Re: kafka as recovery only source

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Tovi,

I've been thinking about this idea.
It might be possible, but I think you have to implement a custom source for
this.

I don't think it would work to have the JMSConsumer, KafkaSink, and
RecoverySource in separate operators because otherwise it would not be
possible to share the Kafka write offset of the recovery topic at
checkpoints.
QueryableState as you suggested only works for keyed state which is (AFAIK)
not available for sources.
The custom source operator would consume from JMS and directly write all
records to Kafka. In case of a recovery, it starts reading from Kafka and
continues with JMS once the recovery topic has been completely consumed.
If you run the source in parallel, you need to handle the partitions of
Kafka recovery topic.

I'm adding Gordon to this thread who might have additional comments or
ideas.

Best, Fabian


2018-02-06 15:31 GMT+01:00 Sofer, Tovi <to...@citi.com>:

> Hi group,
>
>
>
> I wanted to get your suggestion on how to implement two requirements we
> have:
>
> ·         One is to read from external message queue (JMS) at very fast
> latency
>
> ·         Second is to support zero data loss, so that in case of restart
> and recovery, messages not checkpointed (and not part of state) will be
> replayed again.
>
> (which indicates kind of replayble source)
>
>
>
> Because of the first requirement we can’t write JMS messages to Kafka
> first and only then read from kafka, because it will increase latency.
>
> Instead we thought to consume the JMS messages and forward them both to
> job and to KafkaSink.
>
> Then in case of failure and recovery, we want to start in recovery mode,
> and read message from offset matching the state\checkpoint.
>
> How can this be done? We though to somehow save in the state the last
> flushed kakfa offset.
>
> The problem is this information is available only via future\interceptor
> and we don’t know how to connect it to state, so RecoverySource can use it…
>
>
>
> So current suggestion looks something like:
>
>
>
> Happy path:
>
> JMSQueue-> JMSConsumer -> JobMessageParser(and additional operators),
> KafkaSink
>
> (Here maybe we can add ProducerInterceptor-> which saves offset to state
> somehow)
>
>
>
> Failure path: (will run before HappyPath to recover data)
>
> RecoverySource-> JobMessageParser(and additional operators)
>
> (Here maybe add Queryable state client which reads offsets from other
> operator state)
>
>
>
> Thanks,
>
> Tovi
>
>
>