You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nicholas Walton <nw...@me.com> on 2019/10/17 09:19:22 UTC

ElasticSearch failing when parallelised

HI,

I’m running ElasticSearch as a sink for a batch file processing a CSV file of 6.2 million rows, with each row being 181 numeric values. It quite happily processes a small example of around 2,000 rows, running each column through a single parallel pipeline, keyed by column number.

However, once I scale up to the full data size, with parallelism set higher than one typically eight, after a while ElasticSearch fails as below

2019-10-17 09:36:09,550 ERROR org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase  - Failed Elasticsearch bulk request: request retries exceeded max retry timeout [30000]
java.io.IOException: request retries exceeded max retry timeout [30000]
	at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
	at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
	at java.lang.Thread.run(Thread.java:745)
2019-10-17 09:36:09,576 INFO  org.example.Job$                                              - Failed ElasticSearch document. Exception rethrown
2019-10-17 09:36:09,624 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:343)
	at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:380)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: request retries exceeded max retry timeout [30000]
	at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
	at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
	... 1 more
2019-10-17 09:36:09,629 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(GlobalWindows(), CountTrigger, CountEvictor, ScalaProcessWindowFunctionWrapper) -> Map -> Map -> Map -> Sink: Unnamed (2/8) (cb74c5d1c9323000fae9488d7d256468) switched from RUNNING to FAILED.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:42)
	at org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:8)
	at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
	at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
	at org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:218)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: request retries exceeded max retry timeout [30000]
	at org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
	at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
	at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
	at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
	at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
	at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
	at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
	at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
	at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
	... 1 more


I’m running Flink 1.8.1 and version 1.9.0 of the ES connector. I’m running as a standalone cluster on a 16Gb MacBook Pro. The ActivityMonitor is showing no indication of running out of resource CPU or memory, and data is being written to disk at a fairly low rate of around 10-20Mb per second well under the max write rate of 100Mb+ per second. Nothing is showing on the ElasticSearch log save INFO messages about GC occasionally.

The code relevant to the sink is

val httpHosts = new ArrayList[HttpHost]
httpHosts.add(new HttpHost("localhost", 9200, "http"))
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))

val esSinkBuilder = new ElasticsearchSink.Builder[(Int, Long, Double, String)](
  httpHosts,
  new ElasticsearchSinkFunction[(Int, Long, Double, String)] {

    def createIndexRequest(element: (Int, Long, Double, String)): IndexRequest = {
      val json = new util.HashMap[String, Any]

      json.put("arrayinstance", "beamarraytest")
      json.put("bearing", element._1)
      json.put("sampleindex", element._2)
      json.put("sample", element._3)
      json.put("hashstring", element._4)
      return Requests.indexRequest()
        .index("flink-index")
        .`type`("flink-type")
        .source(json)
    }

    def process(element: (Int, Long, Double, String), ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
      indexer.add(createIndexRequest(element))
    }
  }
)

esSinkBuilder.setFailureHandler(
  new ActionRequestFailureHandler() {
    @throws(classOf[Throwable])
    @Override
    override def onFailure(action: ActionRequest, failure: Throwable, restStatusCode: Int, indexer: RequestIndexer) {

      if (ExceptionUtils.findThrowable(failure, classOf[EsRejectedExecutionException]).isPresent) {
        LOG.info("ElasticSearch full queue; re-added document for indexing")
        indexer.add(action)
      } else if (ExceptionUtils.findThrowable(failure, classOf[ElasticsearchParseException]).isPresent) {
        LOG.info("Malformed ElasticSearch document. Document dropped")
      } else {
        LOG.info("Failed ElasticSearch document. Exception rethrown")
        throw failure
      }
    }
  }
)

esSinkBuilder.setBulkFlushMaxActions(10000)

signalFourBuckets.addSink(esSinkBuilder.build)

Is my code at fault, or is the sink just not capable of handling the flow rate? Advice of any form would be very gratefully received.

TIA

Nick Walton