You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nicholas Walton <nw...@me.com> on 2019/11/19 08:38:02 UTC

Elastic search sink error handling

HI,

I need help with handling errors with the elasticsearch sink as below

2019-11-19 08:09:09,043 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase  - Failed Elasticsearch item request: [flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[75:108]: version conflict, document already exists (current version [1])]]
[flink-index-deduplicated/nHWQM0XMSTatRri7zw_s_Q][[flink-index-deduplicated][13]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[75:108]: version conflict, document already exists (current version [1])]]
	at org.elasticsearch.ElasticsearchException.innerFromXContent(ElasticsearchException.java:510)
	at org.elasticsearch.ElasticsearchException.fromXContent(ElasticsearchException.java:421)
	at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:135)
	at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:198)
	at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:653)
	at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$3(RestHighLevelClient.java:549)
	at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:580)
	at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:621)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:375)
	at org.elasticsearch.client.RestClient$1.completed(RestClient.java:366)
	at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
	at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
	at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
	at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
	at java.lang.Thread.run(Thread.java:748)

The error is expected since I  am creating documents with duplicate ids, so I can only load new data from a previous batch that was only partially loaded or due to a timeout I’ve uploaded a document twice to ensure the document is definitely loaded and not lost in the timeout. 

The document is created as

          val json = new util.HashMap[String, Any]
          json.put("arrayinstance", esid)
          json.put("bearing", element._1)
          json.put("sampleindex", element._2)
          json.put("sample", element._3)
          json.put("hashstring", element._4)
          json.put("priorrepeats", element._5)
          return Requests.indexRequest()
            .index("flink-index-deduplicated")
            .`type`("_doc")
            .id(element._1+":"+element._2)
            .create(true)
            .source(json)
        }

My problem is how can I catch the failure, recover and carryon? I have set a failure handler as below which will need extending to handle the failure above

    esSinkBuilder.setFailureHandler(
      new ActionRequestFailureHandler() {
        @throws(classOf[Throwable])
        @Override
        override def onFailure(action: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer) {

          if (ExceptionUtils.findThrowable(failure, classOf[EsRejectedExecutionException]).isPresent) {
            Job.LOG.info("ElasticSearch full queue; re-added document for indexing")
            indexer.add(action)
          } else if (ExceptionUtils.findThrowable(failure, classOf[ElasticsearchParseException]).isPresent) {
            LOG.info("Malformed ElasticSearch document. Document dropped")
          } else if (ExceptionUtils.findThrowable(failure, classOf[java.net.SocketTimeoutException]).isPresent) {
            LOG.info("ElasticSearch document timeout; re-added document for indexing")
            indexer.add(action)
          }/* else if (ExceptionUtils.findThrowable(failure, classOf[]).isPresent) {
            LOG.info("ElasticSearch document duplicate; ignored document")
          } */else {
            // for all other failures, fail the sink
            // here the failure is simply rethrown, but users can also choose to throw custom exceptions
            Job.LOG.info(failure.getMessage)
            throw failure
          }
        }
      }
    )
 
I have tried just ignoring the failure by removing the "throw failure” but to no avail