You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/08/10 04:03:43 UTC

ElasticsearchIO: Connection closed and Cannot get Elasticsearch version exceptions

Hello All,

I am using apache beam java sdk 2.19 and elastic search IO 6.x.

I keep getting following exception while dumping streaming logs to ES:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot get Elasticsearch version

   1.
      1. atorg.apache.beam.runners.dataflow.worker.
      IntrinsicMapTaskExecutorFactory$1.typedApply (
      IntrinsicMapTaskExecutorFactory.java:194
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=194&project=prosimo-test>
      )
      2. atorg.apache.beam.runners.dataflow.worker.
      IntrinsicMapTaskExecutorFactory$1.typedApply (
      IntrinsicMapTaskExecutorFactory.java:165
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=165&project=prosimo-test>
      )
      3. atorg.apache.beam.runners.dataflow.worker.graph.
      Networks$TypeSafeNodeFunction.apply (Networks.java:63
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=63&project=prosimo-test>
      )
      4. atorg.apache.beam.runners.dataflow.worker.graph.
      Networks$TypeSafeNodeFunction.apply (Networks.java:50
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=50&project=prosimo-test>
      )
      5. atorg.apache.beam.runners.dataflow.worker.graph.
      Networks.replaceDirectedNetworkNodes (Networks.java:87
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=87&project=prosimo-test>
      )
      6. atorg.apache.beam.runners.dataflow.worker.
      IntrinsicMapTaskExecutorFactory.create (
      IntrinsicMapTaskExecutorFactory.java:125
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=125&project=prosimo-test>
      )
      7. atorg.apache.beam.runners.dataflow.worker.
      StreamingDataflowWorker.process (StreamingDataflowWorker.java:1266
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1266&project=prosimo-test>
      )
      8. atorg.apache.beam.runners.dataflow.worker.
      StreamingDataflowWorker.access$1100 (StreamingDataflowWorker.java:152
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=152&project=prosimo-test>
      )
      9. atorg.apache.beam.runners.dataflow.worker.
      StreamingDataflowWorker$7.run (StreamingDataflowWorker.java:1073
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1073&project=prosimo-test>
      )
      10. atjava.util.concurrent.ThreadPoolExecutor.runWorker (
      ThreadPoolExecutor.java:1149
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=prosimo-test>
      )
   2.

Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot get Elasticsearch version

   1.
      1. atorg.apache.beam.sdk.util.UserCodeException.wrap (
      UserCodeException.java:34
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.util%2FUserCodeException.java&line=34&project=prosimo-test>
      )
      2. atorg.apache.beam.sdk.io.elasticsearch.
      ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source)
      3. atorg.apache.beam.runners.dataflow.worker.
      DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy (
      DoFnInstanceManagers.java:80
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDoFnInstanceManagers.java&line=80&project=prosimo-test>
      )
      4. atorg.apache.beam.runners.dataflow.worker.
      DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek (
      DoFnInstanceManagers.java:62
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDoFnInstanceManagers.java&line=62&project=prosimo-test>
      )
      5. atorg.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create
       (UserParDoFnFactory.java:95
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FUserParDoFnFactory.java&line=95&project=prosimo-test>
      )
      6. atorg.apache.beam.runners.dataflow.worker.
      DefaultParDoFnFactory.create (DefaultParDoFnFactory.java:75
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDefaultParDoFnFactory.java&line=75&project=prosimo-test>
      )
      7. atorg.apache.beam.runners.dataflow.worker.
      IntrinsicMapTaskExecutorFactory.createParDoOperation (
      IntrinsicMapTaskExecutorFactory.java:264
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=264&project=prosimo-test>
      )
      8. atorg.apache.beam.runners.dataflow.worker.
      IntrinsicMapTaskExecutorFactory.access$000 (
      IntrinsicMapTaskExecutorFactory.java:86
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=86&project=prosimo-test>
      )
      9. atorg.apache.beam.runners.dataflow.worker.
      IntrinsicMapTaskExecutorFactory$1.typedApply (
      IntrinsicMapTaskExecutorFactory.java:183
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=183&project=prosimo-test>
      )

Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch
version

   1.
      1. atorg.apache.beam.sdk.io.elasticsearch.
      ElasticsearchIO.getBackendVersion (ElasticsearchIO.java:1472
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.io.elasticsearch%2FElasticsearchIO.java&line=1472&project=prosimo-test>
      )
      2. atorg.apache.beam.sdk.io.elasticsearch.
      ElasticsearchIO$Write$WriteFn.setup (ElasticsearchIO.java:1270
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.io.elasticsearch%2FElasticsearchIO.java&line=1270&project=prosimo-test>
      )

Caused by: org.apache.http.ConnectionClosedException: Connection is closed

   1.
      1. atorg.elasticsearch.client.RestClient.extractAndWrapCause (
      RestClient.java:813
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=813&project=prosimo-test>
      )
      2. atorg.elasticsearch.client.RestClient.performRequest (
      RestClient.java:248
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=248&project=prosimo-test>
      )
      3. atorg.elasticsearch.client.RestClient.performRequest (
      RestClient.java:235
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=235&project=prosimo-test>
      )
   2.

Caused by: org.apache.http.ConnectionClosedException: Connection is closed

   1.
      1. atorg.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput
      (HttpAsyncRequestExecutor.java:356
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.nio.protocol%2FHttpAsyncRequestExecutor.java&line=356&project=prosimo-test>
      )
      2. atorg.apache.http.impl.nio.
      DefaultNHttpClientConnection.consumeInput (
      DefaultNHttpClientConnection.java:261
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio%2FDefaultNHttpClientConnection.java&line=261&project=prosimo-test>
      )
      3. atorg.apache.http.impl.nio.client.InternalIODispatch.onInputReady (
      InternalIODispatch.java:81
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.client%2FInternalIODispatch.java&line=81&project=prosimo-test>
      )
      4. atorg.apache.http.impl.nio.client.InternalIODispatch.onInputReady (
      InternalIODispatch.java:39
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.client%2FInternalIODispatch.java&line=39&project=prosimo-test>
      )
      5. atorg.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady (
      AbstractIODispatch.java:121
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIODispatch.java&line=121&project=prosimo-test>
      )
      6. atorg.apache.http.impl.nio.reactor.BaseIOReactor.readable (
      BaseIOReactor.java:162
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FBaseIOReactor.java&line=162&project=prosimo-test>
      )
      7. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent (
      AbstractIOReactor.java:337
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=337&project=prosimo-test>
      )
      8. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents
      (AbstractIOReactor.java:315
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=315&project=prosimo-test>
      )
      9. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.execute (
      AbstractIOReactor.java:276
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=276&project=prosimo-test>
      )
      10. atorg.apache.http.impl.nio.reactor.BaseIOReactor.execute (
      BaseIOReactor.java:104
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FBaseIOReactor.java&line=104&project=prosimo-test>
      )
   2.
      1. atorg.apache.http.impl.nio.reactor.
      AbstractMultiworkerIOReactor$Worker.run (
      AbstractMultiworkerIOReactor.java:591)


My code to write to ES is pretty simple:

input

    .apply("Convert_PCollection<POJO:AppAccessSessionLog> to
PCollection<String>", new AppAccessSessionLogToString())

    .apply("Write_To_Elastic_Search", ElasticsearchIO.write()

        .withConnectionConfiguration(

            ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint,
“indexname”, "_doc")

                .withUsername(username).withPassword(password))

        .withRetryConfiguration(

            ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS,
Duration.millis(60000)))

        .withUsePartialUpdate(true)

        .withIdFn(new ElasticExtractNameFieldIdFn(“id”))

    );


Any idea how to resolve this ?

Thanks and Regards
Mohil