You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2021/03/08 00:12:08 UTC
Re: Error During ElasticsearchIO read
Hello,
I guess Operation timed out was more of a networking issue (since curl was
also failing which I should have tried first). When I tried with different
elastic search cluster, I didn't get this error message.
However, I am still unable to read because of following:
{"error":
{"root_cause":[{"type":"illegal_argument_exception","reason":"Cannot parse
scroll id"}],"type":"illegal_argument_exception","reason":"Cannot parse
scroll
id","caused_by":{"type":"array_index_out_of_bounds_exception","reason":"arraycopy:
last source index 1668257 out of bounds for byte[3]"}},"status":400}
1.
1. at org.elasticsearch.client.RestClient.convertResponse (
RestClient.java:283
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-03-07_16_00_02-4986317764329077743&file=org%2Felasticsearch.client%2FRestClient.java&line=283&project=prosimo-firstnetwork&authuser=1>
)
2. at org.elasticsearch.client.RestClient.performRequest (
RestClient.java:261
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-03-07_16_00_02-4986317764329077743&file=org%2Felasticsearch.client%2FRestClient.java&line=261&project=prosimo-firstnetwork&authuser=1>
)
3. at org.elasticsearch.client.RestClient.performRequest (
RestClient.java:235
<https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-03-07_16_00_02-4986317764329077743&file=org%2Felasticsearch.client%2FRestClient.java&line=235&project=prosimo-firstnetwork&authuser=1>
)
2.
1. at org.apache.beam.sdk.io.elasticsearch.
ElasticsearchIO$BoundedElasticsearchReader.close (
ElasticsearchIO.java:918)
2. at org.apache.beam.runners.dataflow.worker.
WorkerCustomSources$BoundedReaderIterator.close (
WorkerCustomSources.java:632)
3. at org.apache.beam.runners.dataflow.worker.util.common.worker.
NativeReader$NativeReaderIterator.abort (NativeReader.java:179)
4. at org.apache.beam.runners.dataflow.worker.util.common.worker.
ReadOperation$SynchronizedReaderIterator.abort (ReadOperation.java:371
)
5. at org.apache.beam.runners.dataflow.worker.util.common.worker.
ReadOperation.abort (ReadOperation.java:256)
6. at org.apache.beam.runners.dataflow.worker.util.common.worker.
MapTaskExecutor.execute (MapTaskExecutor.java:91)
Any help would be greatly appreciated.
Thanks and Regards
Mohil
On Sun, Mar 7, 2021 at 2:58 PM Mohil Khare <mo...@prosimo.io> wrote:
> Hello ElasticSearchIO and beam users/developers,
>
> I am on Beam 2.23.0 and elasticsearch 6.8
>
> I have been using elasticsearchIO.write() successfully.
> For the first time, I am trying to use elasticsearchIO.read because I have
> a use case where I want to read data from one elasticsearch cluster,
> modify data and then write it to another elasticsearch cluster.
>
> My read transform is very simple:
>
> *Pipeline p = input.getPipeline();*
>
> *return p*
>
> * .apply("Read_From_ES", ElasticsearchIO.read()*
>
> * .withQuery(query)*
>
> * .withBatchSize(50)*
>
> * .withScrollKeepalive("3m")*
>
> * .withConnectionConfiguration(*
>
> *
> ElasticsearchIO.ConnectionConfiguration.create(esConnectionParams.getElasticsearchEndpoints(),
> indexName, "_doc")*
>
> * .withConnectTimeout(240000)*
>
> * .withSocketTimeout(240000)*
>
> *
> .withUsername(esConnectionParams.getElasticsearchUsername()).withPassword(esConnectionParams.getElasticsearchPassword()))*
>
> * );*
>
> But I am unable to submit job successfully and getting following exception
> in my READ transform:
>
> WARNING: Size estimation of the source failed:
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource@7c974942
> java.net.ConnectException: Operation timed out
> at
> org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823)
> at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
> at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getStats(ElasticsearchIO.java:797)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:700)
> at
> org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:77)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:38)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:35)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:484)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:423)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:182)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:888)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:194)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> at
> io.prosimo.analytics.beam.datacopy.DataCopy.runDataCopy(DataCopy.java:43)
> at io.prosimo.analytics.beam.datacopy.DataCopy.main(DataCopy.java:109)
> Caused by: java.net.ConnectException: Operation timed out
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
> at
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
> at
> org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
> at
> org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
> at java.lang.Thread.run(Thread.java:748)
>
> I tried various values while specifying connectTimeout, socketTimeout,
> batchSize and scrollKeepalive, but still the same issue.
>
> Any help would be greatly appreciated.
>
> Thanks and Regards
> Mohil
>
>
>
>