You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrew Roberts <ar...@fuze.com> on 2017/01/11 14:49:01 UTC

Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

Hello,

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in terms of message delivery. according to (1), the ES sink offers at-least-once guarantees. This page doesn’t differentiate between flink-elasticsearch and flink-elasticsearch2, so I have to assume for the moment that they both offer that guarantee. However, a look at the code (2) shows that the invoke() method puts the record into a buffer, and then that buffer is flushed to elasticsearch some time later.

It’s my understanding that Flink uses checkpoint “records” flowing past the sink as a means for forming the guarantee that all records prior to the checkpoint have been received by the sink.  I assume that the invoke() method returning is what Flink uses to decide if a record has passed a sink, but here invoke stashes in a buffer that doesn’t look like it participates in checkpointing anywhere.

Does the sink provided in link-connector-elasticsearch2 guarantee at-least-once, and if it does, how does it reconstitute the buffer (so as to not lose records that have gone through the sink’s invoke() method, but not been transmitted to ES yet) in the case of the operator failing when the buffer is not empty?


(1) https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html>
(2) https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java>

Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Andrew!

There’s nothing special about extending the checkpointing interfaces for the SinkFunction; for Flink they’re essentially user functions that have user state to be checkpointed.
So yes, you’ll just implement is as you would for a flatMap / map / etc. function.

Fell free to let me know if you bump into any questions.

Cheers,
Gordon


On January 16, 2017 at 11:37:30 PM, Andrew Roberts (aroberts@fuze.com) wrote:

Hi Gordon,

Thanks for getting back to me. The ticket looks good, but I’m going to need to do something similar for our homegrown sinks. It sounds like just having the affected sinks participate in checkpointing is enough of a solution - is there anything special about `SinkFunction[T]` extending `Checkpointed[S]`, or can I just implement it as I would for e.g. a mapping function?

Thanks,

Andrew



On Jan 13, 2017, at 4:34 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:

Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles around how we deal with the pending buffered requests with accordance to Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the description: https://issues.apache.org/jira/browse/FLINK-5487. What do you think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper at-least-once support in the Elasticsearch sinks, so I believe this will be brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (aroberts@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in terms of message delivery. according to (1), the ES sink offers at-least-once guarantees. This page doesn’t differentiate between flink-elasticsearch and flink-elasticsearch2, so I have to assume for the moment that they both offer that guarantee. However, a look at the code (2) shows that the invoke() method puts the record into a buffer, and then that buffer is flushed to elasticsearch some time later.


Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

Posted by Andrew Roberts <ar...@fuze.com>.
Hi Gordon,

Thanks for getting back to me. The ticket looks good, but I’m going to need to do something similar for our homegrown sinks. It sounds like just having the affected sinks participate in checkpointing is enough of a solution - is there anything special about `SinkFunction[T]` extending `Checkpointed[S]`, or can I just implement it as I would for e.g. a mapping function?

Thanks,

Andrew



> On Jan 13, 2017, at 4:34 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
> 
> Hi Andrew,
> 
> Your observations are correct. Like you mentioned, the current problem circles around how we deal with the pending buffered requests with accordance to Flink’s checkpointing.
> I’ve filed a JIRA for this, as well as some thoughts for the solution in the description: https://issues.apache.org/jira/browse/FLINK-5487 <https://issues.apache.org/jira/browse/FLINK-5487>. What do you think?
> 
> Thank you for bringing this up! We should probably fix this soon.
> There’s already some on-going effort in fixing some other aspects of proper at-least-once support in the Elasticsearch sinks, so I believe this will be brought to attention very soon too.
> 
> Cheers,
> Gordon
> 
> 
> 
> 
> On January 11, 2017 at 3:49:06 PM, Andrew Roberts (aroberts@fuze.com <ma...@fuze.com>) wrote:
> 
>> I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in terms of message delivery. according to (1), the ES sink offers at-least-once guarantees. This page doesn’t differentiate between flink-elasticsearch and flink-elasticsearch2, so I have to assume for the moment that they both offer that guarantee. However, a look at the code (2) shows that the invoke() method puts the record into a buffer, and then that buffer is flushed to elasticsearch some time later.


Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles around how we deal with the pending buffered requests with accordance to Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the description: https://issues.apache.org/jira/browse/FLINK-5487. What do you think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper at-least-once support in the Elasticsearch sinks, so I believe this will be brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (aroberts@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in terms of message delivery. according to (1), the ES sink offers at-least-once guarantees. This page doesn’t differentiate between flink-elasticsearch and flink-elasticsearch2, so I have to assume for the moment that they both offer that guarantee. However, a look at the code (2) shows that the invoke() method puts the record into a buffer, and then that buffer is flushed to elasticsearch some time later.