You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/08/10 04:03:43 UTC
ElasticsearchIO: Connection closed and Cannot get Elasticsearch
version exceptions
Hello All,
I am using apache beam java sdk 2.19 and elastic search IO 6.x.
I keep getting following exception while dumping streaming logs to ES:
java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot get Elasticsearch version
1.
1. atorg.apache.beam.runners.dataflow.worker.
IntrinsicMapTaskExecutorFactory$1.typedApply (
IntrinsicMapTaskExecutorFactory.java:194
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=194&project=prosimo-test>
)
2. atorg.apache.beam.runners.dataflow.worker.
IntrinsicMapTaskExecutorFactory$1.typedApply (
IntrinsicMapTaskExecutorFactory.java:165
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=165&project=prosimo-test>
)
3. atorg.apache.beam.runners.dataflow.worker.graph.
Networks$TypeSafeNodeFunction.apply (Networks.java:63
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=63&project=prosimo-test>
)
4. atorg.apache.beam.runners.dataflow.worker.graph.
Networks$TypeSafeNodeFunction.apply (Networks.java:50
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=50&project=prosimo-test>
)
5. atorg.apache.beam.runners.dataflow.worker.graph.
Networks.replaceDirectedNetworkNodes (Networks.java:87
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker.graph%2FNetworks.java&line=87&project=prosimo-test>
)
6. atorg.apache.beam.runners.dataflow.worker.
IntrinsicMapTaskExecutorFactory.create (
IntrinsicMapTaskExecutorFactory.java:125
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=125&project=prosimo-test>
)
7. atorg.apache.beam.runners.dataflow.worker.
StreamingDataflowWorker.process (StreamingDataflowWorker.java:1266
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1266&project=prosimo-test>
)
8. atorg.apache.beam.runners.dataflow.worker.
StreamingDataflowWorker.access$1100 (StreamingDataflowWorker.java:152
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=152&project=prosimo-test>
)
9. atorg.apache.beam.runners.dataflow.worker.
StreamingDataflowWorker$7.run (StreamingDataflowWorker.java:1073
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FStreamingDataflowWorker.java&line=1073&project=prosimo-test>
)
10. atjava.util.concurrent.ThreadPoolExecutor.runWorker (
ThreadPoolExecutor.java:1149
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=java%2Futil.concurrent%2FThreadPoolExecutor.java&line=1149&project=prosimo-test>
)
2.
Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot get Elasticsearch version
1.
1. atorg.apache.beam.sdk.util.UserCodeException.wrap (
UserCodeException.java:34
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.util%2FUserCodeException.java&line=34&project=prosimo-test>
)
2. atorg.apache.beam.sdk.io.elasticsearch.
ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source)
3. atorg.apache.beam.runners.dataflow.worker.
DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy (
DoFnInstanceManagers.java:80
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDoFnInstanceManagers.java&line=80&project=prosimo-test>
)
4. atorg.apache.beam.runners.dataflow.worker.
DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek (
DoFnInstanceManagers.java:62
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDoFnInstanceManagers.java&line=62&project=prosimo-test>
)
5. atorg.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create
(UserParDoFnFactory.java:95
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FUserParDoFnFactory.java&line=95&project=prosimo-test>
)
6. atorg.apache.beam.runners.dataflow.worker.
DefaultParDoFnFactory.create (DefaultParDoFnFactory.java:75
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FDefaultParDoFnFactory.java&line=75&project=prosimo-test>
)
7. atorg.apache.beam.runners.dataflow.worker.
IntrinsicMapTaskExecutorFactory.createParDoOperation (
IntrinsicMapTaskExecutorFactory.java:264
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=264&project=prosimo-test>
)
8. atorg.apache.beam.runners.dataflow.worker.
IntrinsicMapTaskExecutorFactory.access$000 (
IntrinsicMapTaskExecutorFactory.java:86
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=86&project=prosimo-test>
)
9. atorg.apache.beam.runners.dataflow.worker.
IntrinsicMapTaskExecutorFactory$1.typedApply (
IntrinsicMapTaskExecutorFactory.java:183
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.runners.dataflow.worker%2FIntrinsicMapTaskExecutorFactory.java&line=183&project=prosimo-test>
)
Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch
version
1.
1. atorg.apache.beam.sdk.io.elasticsearch.
ElasticsearchIO.getBackendVersion (ElasticsearchIO.java:1472
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.io.elasticsearch%2FElasticsearchIO.java&line=1472&project=prosimo-test>
)
2. atorg.apache.beam.sdk.io.elasticsearch.
ElasticsearchIO$Write$WriteFn.setup (ElasticsearchIO.java:1270
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.beam.sdk.io.elasticsearch%2FElasticsearchIO.java&line=1270&project=prosimo-test>
)
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
1.
1. atorg.elasticsearch.client.RestClient.extractAndWrapCause (
RestClient.java:813
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=813&project=prosimo-test>
)
2. atorg.elasticsearch.client.RestClient.performRequest (
RestClient.java:248
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=248&project=prosimo-test>
)
3. atorg.elasticsearch.client.RestClient.performRequest (
RestClient.java:235
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Felasticsearch.client%2FRestClient.java&line=235&project=prosimo-test>
)
2.
Caused by: org.apache.http.ConnectionClosedException: Connection is closed
1.
1. atorg.apache.http.nio.protocol.HttpAsyncRequestExecutor.endOfInput
(HttpAsyncRequestExecutor.java:356
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.nio.protocol%2FHttpAsyncRequestExecutor.java&line=356&project=prosimo-test>
)
2. atorg.apache.http.impl.nio.
DefaultNHttpClientConnection.consumeInput (
DefaultNHttpClientConnection.java:261
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio%2FDefaultNHttpClientConnection.java&line=261&project=prosimo-test>
)
3. atorg.apache.http.impl.nio.client.InternalIODispatch.onInputReady (
InternalIODispatch.java:81
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.client%2FInternalIODispatch.java&line=81&project=prosimo-test>
)
4. atorg.apache.http.impl.nio.client.InternalIODispatch.onInputReady (
InternalIODispatch.java:39
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.client%2FInternalIODispatch.java&line=39&project=prosimo-test>
)
5. atorg.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady (
AbstractIODispatch.java:121
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIODispatch.java&line=121&project=prosimo-test>
)
6. atorg.apache.http.impl.nio.reactor.BaseIOReactor.readable (
BaseIOReactor.java:162
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FBaseIOReactor.java&line=162&project=prosimo-test>
)
7. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent (
AbstractIOReactor.java:337
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=337&project=prosimo-test>
)
8. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents
(AbstractIOReactor.java:315
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=315&project=prosimo-test>
)
9. atorg.apache.http.impl.nio.reactor.AbstractIOReactor.execute (
AbstractIOReactor.java:276
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FAbstractIOReactor.java&line=276&project=prosimo-test>
)
10. atorg.apache.http.impl.nio.reactor.BaseIOReactor.execute (
BaseIOReactor.java:104
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2020-08-09_09_33_48-6243454950445806031&file=org%2Fapache.http.impl.nio.reactor%2FBaseIOReactor.java&line=104&project=prosimo-test>
)
2.
1. atorg.apache.http.impl.nio.reactor.
AbstractMultiworkerIOReactor$Worker.run (
AbstractMultiworkerIOReactor.java:591)
My code to write to ES is pretty simple:
input
.apply("Convert_PCollection<POJO:AppAccessSessionLog> to
PCollection<String>", new AppAccessSessionLogToString())
.apply("Write_To_Elastic_Search", ElasticsearchIO.write()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(elasticsearchEndPoint,
“indexname”, "_doc")
.withUsername(username).withPassword(password))
.withRetryConfiguration(
ElasticsearchIO.RetryConfiguration.create(MAX_ATTEMPTS,
Duration.millis(60000)))
.withUsePartialUpdate(true)
.withIdFn(new ElasticExtractNameFieldIdFn(“id”))
);
Any idea how to resolve this ?
Thanks and Regards
Mohil