You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nicholas Walton <nw...@me.com> on 2019/11/19 08:38:02 UTC
Elastic search sink error handling
HI,
I need help with handling errors with the elasticsearch sink as below
2019-11-19 08:09:09,043 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase - Failed Elasticsearch item request: [flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[75:108]: version conflict, document already exists (current version [1])]]
[flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[75:108]: version conflict, document already exists (current version [1])]]
at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
The error is expected since I am creating documents with duplicate ids, so I can only load new data from a previous batch that was only partially loaded or due to a timeout I’ve uploaded a document twice to ensure the document is definitely loaded and not lost in the timeout.
The document is created as
val json = new util.HashMap[String, Any]
json.put("arrayinstance", esid)
json.put("bearing", element._1)
json.put("sampleindex", element._2)
json.put("sample", element._3)
json.put("hashstring", element._4)
json.put("priorrepeats", element._5)
return Requests.indexRequest()
.index("flink-index-deduplicated")
.`type`("_doc")
.id(element._1+":"+element._2)
.create(true)
.source(json)
}
My problem is how can I catch the failure, recover and carryon? I have set a failure handler as below which will need extending to handle the failure above
esSinkBuilder.setFailureHandler(
new ActionRequestFailureHandler() {
@throws(classOf[Throwable])
@Override
override def onFailure(action: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer) {
if (ExceptionUtils.findThrowable(failure, classOf[EsRejectedExecutionException]).isPresent) {
Job.LOG.info("ElasticSearch full queue; re-added document for indexing")
indexer.add(action)
} else if (ExceptionUtils.findThrowable(failure, classOf[ElasticsearchParseException]).isPresent) {
LOG.info("Malformed ElasticSearch document. Document dropped")
} else if (ExceptionUtils.findThrowable(failure, classOf[java.net.SocketTimeoutException]).isPresent) {
LOG.info("ElasticSearch document timeout; re-added document for indexing")
indexer.add(action)
}/* else if (ExceptionUtils.findThrowable(failure, classOf[]).isPresent) {
LOG.info("ElasticSearch document duplicate; ignored document")
} */else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
Job.LOG.info(failure.getMessage)
throw failure
}
}
}
)
I have tried just ignoring the failure by removing the "throw failure” but to no avail