You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2019/02/09 07:27:27 UTC

ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Hello,

I am trying to follow this Flink guide [1] to handle errors in
ElasticSearchSink by re-adding the failed messages to the queue.
The error scenarios that I got and going to retry are: (i) conflict in
UpdateRequest document version and (ii) lost connection to ElasticSearch.
These errors are expected to be non-persistent, would be solved by (i)
changing the version / (ii) gone after some seconds
What I expect is message got retried successfully.
What I actually got was: Flink seemed to get stuck on that (first) retry, my
flow queued up (backpressure is 1 everywhere), all processing hung.

Here is my error handling code:

<code>
	private object MyElasticSearchFailureHandler extends
ActionRequestFailureHandler {
		override def onFailure(actionRequest: ActionRequest, failure: Throwable,
restStatusCode: Int, indexer: RequestIndexer): Unit = {
			if (ExceptionUtils.findThrowableWithMessage(failure,
"version_conflict_engine_exception") != Optional.empty()) {
				actionRequest match {
					case s: UpdateRequest =>
						LOG.warn(s"Failed inserting record to ElasticSearch due to version
conflict (${s.version()}). Retrying")
						LOG.warn(actionRequest.toString)
						indexer.add(s.version(s.version() + 1))
					case _ =>
						LOG.error("Failed inserting record to ElasticSearch due to version
conflict. However, this is not an Update-Request. Don't know why.")
						LOG.error(actionRequest.toString)
						throw failure
				}
			} else if (restStatusCode == -1 &&
failure.getMessage.contains("Connection closed")) {
				LOG.warn(s"Retrying record: ${actionRequest.toString}")
				actionRequest match {
					case s: UpdateRequest => indexer.add(s)
					case s: IndexRequest => indexer.add(s)
				}
			} else {
				LOG.error(s"ELASTICSEARCH FAILED:\n    statusCode $restStatusCode\n   
message: ${failure.getMessage}\n${failure.getStackTrace}")
				LOG.error(s"    DATA:\n    ${actionRequest.toString}")
				throw failure
			}
		}
	}
</code>

Here is the extract from my task-manager logs:

/2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
Elasticsearch bulk request: Connection closed
2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN 
c.n.c......sink.MyElasticSearchSink$  - Retrying record: update
{[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index
{*[null][null][null]*, source[{...}]}], scripted_upsert[false],
detect_noop[true]}
2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO 
o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
checkpointing for checkpoint with id=24 (max part counter=26)./

And job-manager logs:
/2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
307078 ms).
2019-02-09 04:09:30.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:17:00.970 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:24:31.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
2019-02-09 04:32:01.035 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
2019-02-09 04:39:30.961 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./

Thanks and best regards,
Averell

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>  



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by Averell <lv...@gmail.com>.
Thank you Gordon and Ken.

My Flink job is now running well with 1.7.2 RC1, with failed ES request
retried successfully.

One more question I have on this is how to limit the number of retries for
different types of errors with ES bulk request. Is there any guideline on
that?

My temporary solution is to use the version field of each ER request -
increase it for every time I retried putting the request into the queue.
This works for me until now, but it doesn't look right.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

@Averell <lv...@gmail.com> I renamed the
`ElasticsearchFailureHandlerIndexer` to be `BufferingNoOpRequestIndexer`,
which explains why you can't find it.

The voting thread for RC#1 of 1.7.2 can be found at [1].
The actual commits which fixes the problem are d9c45af to 2f52227.

Cheers,
Gordon

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-7-2-release-candidate-1-td26882.html

On Thu, Feb 14, 2019 at 9:38 AM Ken Krugler <kk...@transpac.com>
wrote:

> Hi Averell,
>
>
> https://github.com/apache/flink/commit/35af99391dac431c85e30bcc98b89cba79bccfea#diff-51a12ea54593424e195dd5874309a08d
>
> …is the commit where Gordon made his changes for FLINK-11046
> <https://issues.apache.org/jira/browse/FLINK-11046>.
>
> The ElasticsearchFailureHandlerIndexer class was removed as part of the
> commit.
>
> — Ken
>
>
> On Feb 13, 2019, at 4:46 PM, Averell <lv...@gmail.com> wrote:
>
> Hi Ken,
>
> Thanks for that. But I could not find the changes included in Gordon's
> mentioned pull request in the repository you gave me (e.g: the new class
> /ElasticsearchFailureHandlerIndexer/).
> I have found this folder
> https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
> doesn't have that new class.
> Maybe Gordon meant 1.7.2 rc2?
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by Ken Krugler <kk...@transpac.com>.
Hi Averell,

https://github.com/apache/flink/commit/35af99391dac431c85e30bcc98b89cba79bccfea#diff-51a12ea54593424e195dd5874309a08d <https://github.com/apache/flink/commit/35af99391dac431c85e30bcc98b89cba79bccfea#diff-51a12ea54593424e195dd5874309a08d>

…is the commit where Gordon made his changes for FLINK-11046 <https://issues.apache.org/jira/browse/FLINK-11046>.

The ElasticsearchFailureHandlerIndexer class was removed as part of the commit.

— Ken


> On Feb 13, 2019, at 4:46 PM, Averell <lv...@gmail.com> wrote:
> 
> Hi Ken,
> 
> Thanks for that. But I could not find the changes included in Gordon's
> mentioned pull request in the repository you gave me (e.g: the new class
> /ElasticsearchFailureHandlerIndexer/). 
> I have found this folder
> https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
> doesn't have that new class.
> Maybe Gordon meant 1.7.2 rc2?
> 
> Thanks and regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by Averell <lv...@gmail.com>.
Hi Ken,

Thanks for that. But I could not find the changes included in Gordon's
mentioned pull request in the repository you gave me (e.g: the new class
/ElasticsearchFailureHandlerIndexer/). 
I have found this folder
https://dist.apache.org/repos/dist/dev/flink/flink-1.7.2-rc1/, but it also
doesn't have that new class.
Maybe Gordon meant 1.7.2 rc2?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by Ken Krugler <kk...@transpac.com>.
Hi Averell,

You can get release candidates from the Apache release candidate maven repo. For 1.7.2, I think it’s in:

https://repository.apache.org/content/repositories/orgapacheflink-1206/ <https://repository.apache.org/content/repositories/orgapacheflink-1206/>

So just edit your pom.xml to add this repo to the <repositories> section.

— Ken

> On Feb 13, 2019, at 4:20 PM, Averell <lv...@gmail.com> wrote:
> 
> Hi Gordon,
> 
> Sorry for a noob question: How can I get the RC 1.7.2 build / code to build?
> I could not find any branch like that in Github.
> 
> Thanks and regards,
> Averell
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by Averell <lv...@gmail.com>.
Hi Gordon,

Sorry for a noob question: How can I get the RC 1.7.2 build / code to build?
I could not find any branch like that in Github.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Thanks for testing it out.
Will be great to get your feedback on whether the release candidate for
1.7.2 fixes this for you.

On Wed, Feb 13, 2019 at 7:38 PM Averell <lv...@gmail.com> wrote:

> Thank you Gordon.
>
> That's my exact  problem. Will try the fix in 1.7.2 now.
>
> Thanks and regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by Averell <lv...@gmail.com>.
Thank you Gordon.

That's my exact  problem. Will try the fix in 1.7.2 now.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: ElasticSearchSink - retrying doesn't work in ActionRequestFailureHandler

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Averell,

This seems to be the bug that you encountered:
https://issues.apache.org/jira/browse/FLINK-11046.

Cheers,
Gordon

On Sat, Feb 9, 2019 at 3:27 PM Averell <lv...@gmail.com> wrote:

> Hello,
>
> I am trying to follow this Flink guide [1] to handle errors in
> ElasticSearchSink by re-adding the failed messages to the queue.
> The error scenarios that I got and going to retry are: (i) conflict in
> UpdateRequest document version and (ii) lost connection to ElasticSearch.
> These errors are expected to be non-persistent, would be solved by (i)
> changing the version / (ii) gone after some seconds
> What I expect is message got retried successfully.
> What I actually got was: Flink seemed to get stuck on that (first) retry,
> my
> flow queued up (backpressure is 1 everywhere), all processing hung.
>
> Here is my error handling code:
>
> <code>
>         private object MyElasticSearchFailureHandler extends
> ActionRequestFailureHandler {
>                 override def onFailure(actionRequest: ActionRequest,
> failure: Throwable,
> restStatusCode: Int, indexer: RequestIndexer): Unit = {
>                         if
> (ExceptionUtils.findThrowableWithMessage(failure,
> "version_conflict_engine_exception") != Optional.empty()) {
>                                 actionRequest match {
>                                         case s: UpdateRequest =>
>                                                 LOG.warn(s"Failed
> inserting record to ElasticSearch due to version
> conflict (${s.version()}). Retrying")
>
> LOG.warn(actionRequest.toString)
>
> indexer.add(s.version(s.version() + 1))
>                                         case _ =>
>                                                 LOG.error("Failed
> inserting record to ElasticSearch due to version
> conflict. However, this is not an Update-Request. Don't know why.")
>
> LOG.error(actionRequest.toString)
>                                                 throw failure
>                                 }
>                         } else if (restStatusCode == -1 &&
> failure.getMessage.contains("Connection closed")) {
>                                 LOG.warn(s"Retrying record:
> ${actionRequest.toString}")
>                                 actionRequest match {
>                                         case s: UpdateRequest =>
> indexer.add(s)
>                                         case s: IndexRequest =>
> indexer.add(s)
>                                 }
>                         } else {
>                                 LOG.error(s"ELASTICSEARCH FAILED:\n
> statusCode $restStatusCode\n
> message: ${failure.getMessage}\n${failure.getStackTrace}")
>                                 LOG.error(s"    DATA:\n
> ${actionRequest.toString}")
>                                 throw failure
>                         }
>                 }
>         }
> </code>
>
> Here is the extract from my task-manager logs:
>
> /2019-02-09 04:12:35.676 [I/O dispatcher 25] ERROR
> o.a.f.s.connectors.elasticsearch.ElasticsearchSinkBase  - Failed
> Elasticsearch bulk request: Connection closed
> 2019-02-09 04:12:35.678 [I/O dispatcher 25] WARN
> c.n.c......sink.MyElasticSearchSink$  - Retrying record: update
> {[idx-20190208][_doc][doc_id_1549622700000], doc_as_upsert[true], doc[index
> {*[null][null][null]*, source[{...}]}], scripted_upsert[false],
> detect_noop[true]}
> 2019-02-09 04:12:54.242 [Sink: S3 - Historical (1/4)] INFO
> o.a.flink.streaming.api.functions.sink.filesystem.Buckets  - Subtask 0
> checkpointing for checkpoint with id=24 (max part counter=26)./
>
> And job-manager logs:
> /2019-02-09 03:59:37.880 [flink-akka.actor.default-dispatcher-4] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 23 for job 1a1438ca23387c4ef9a59ff9da6dafa1 (430392865 bytes in
> 307078 ms).
> 2019-02-09 04:09:30.970 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 24 @ 1549685370776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
> 2019-02-09 04:17:00.970 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 24
> of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
> 2019-02-09 04:24:31.035 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 25 @ 1549686270776 for job 1a1438ca23387c4ef9a59ff9da6dafa1.
> 2019-02-09 04:32:01.035 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Checkpoint 25
> of job 1a1438ca23387c4ef9a59ff9da6dafa1 expired before completing.
> 2019-02-09 04:39:30.961 [Checkpoint Timer] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Triggering
> checkpoint 26 @ 1549687170776 for job 1a1438ca23387c4ef9a59ff9da6dafa1./
>
> Thanks and best regards,
> Averell
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html#handling-failing-elasticsearch-requests>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>