You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2018/10/01 09:16:43 UTC

Re: Flink Scheduler Customization

Hi Ananth,

You can certainly do this with Flink, but there are no built-in operators
for this.
What you probably want to do is to compare the timestamp of the event with
the current processing time and drop the record if it is too old.
If the timestamp is encoded in the record, you can do this in
FilterFunction or a FlatMapFunction. If the timestamp is attached as
event-time timestamp, you can access it in a ProcessFunction.

Best, Fabian

Am Sa., 29. Sep. 2018 um 21:11 Uhr schrieb Ananth Durai <vananth22@gmail.com
>:

>
> I'm writing a Flink connector to write a stream of events from Kafka to
> Elastic Search. It is a typical metrics ingestion pipeline, where the
> latest metrics preferred over the stale data.
> What I mean by that, let's assume there was an outage of Elastic Search
> cluster for about 20 minutes, all the metrics backlogged in Kafka during
> that period. Once ES is available, the Flink stream will resume from the
> last offset checkpoint (correctly so) and try to catch up. Instead is their
> way we can customize flink stream to say if it detects significant backlog,
> skip over and consume from the latest offset, and schedule a separate
> backfill task for the backlogged 20 minutes?
>
> Regards,
> Ananth.P,
>
>
>
>
>
>

Re: Flink Scheduler Customization

Posted by Hequn Cheng <ch...@gmail.com>.
Hi Ananth,

> if it detects significant backlog, skip over and consume from the latest
offset, and schedule a separate backfill task for the backlogged 20 minutes?

Fabian is right, there is no built-in operators for this.
If you don't care about Watermark, I think we can implement it with a
custom source which can sleep or consume data within a time range.

The job looks like:
Source1(s1) ->
                         Union -> Sink
Source2(s2) ->

The job works as follows:
- t1: s1 working, s2 sleep
- t2: There is an outage of Elastic Search cluster
- t3: ES is available. s1 resume from t1 and end with t3. s2 start from t3
directly.
- t4: s1 sleep, s2 working

To achieve this, we should also find a way to exchange progresses between
the two sources. For example, sync source status with a Hbase or a Mysql
Table.

Best, Hequn


On Mon, Oct 1, 2018 at 5:17 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Ananth,
>
> You can certainly do this with Flink, but there are no built-in operators
> for this.
> What you probably want to do is to compare the timestamp of the event with
> the current processing time and drop the record if it is too old.
> If the timestamp is encoded in the record, you can do this in
> FilterFunction or a FlatMapFunction. If the timestamp is attached as
> event-time timestamp, you can access it in a ProcessFunction.
>
> Best, Fabian
>
> Am Sa., 29. Sep. 2018 um 21:11 Uhr schrieb Ananth Durai <
> vananth22@gmail.com>:
>
>>
>> I'm writing a Flink connector to write a stream of events from Kafka to
>> Elastic Search. It is a typical metrics ingestion pipeline, where the
>> latest metrics preferred over the stale data.
>> What I mean by that, let's assume there was an outage of Elastic Search
>> cluster for about 20 minutes, all the metrics backlogged in Kafka during
>> that period. Once ES is available, the Flink stream will resume from the
>> last offset checkpoint (correctly so) and try to catch up. Instead is their
>> way we can customize flink stream to say if it detects significant backlog,
>> skip over and consume from the latest offset, and schedule a separate
>> backfill task for the backlogged 20 minutes?
>>
>> Regards,
>> Ananth.P,
>>
>>
>>
>>
>>
>>