You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2021/03/08 00:12:08 UTC

Re: Error During ElasticsearchIO read

Hello,
I guess Operation timed out was more of a networking issue (since curl was
also failing which I should have tried first). When I tried with different
elastic search cluster, I didn't get this error message.

However, I am still unable to read because of following:

{"error":
{"root_cause":[{"type":"illegal_argument_exception","reason":"Cannot parse
scroll id"}],"type":"illegal_argument_exception","reason":"Cannot parse
scroll
id","caused_by":{"type":"array_index_out_of_bounds_exception","reason":"arraycopy:
last source index 1668257 out of bounds for byte[3]"}},"status":400}

   1.
      1. at org.elasticsearch.client.RestClient.convertResponse (
      RestClient.java:283
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-03-07_16_00_02-4986317764329077743&file=org%2Felasticsearch.client%2FRestClient.java&line=283&project=prosimo-firstnetwork&authuser=1>
      )
      2. at org.elasticsearch.client.RestClient.performRequest (
      RestClient.java:261
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-03-07_16_00_02-4986317764329077743&file=org%2Felasticsearch.client%2FRestClient.java&line=261&project=prosimo-firstnetwork&authuser=1>
      )
      3. at org.elasticsearch.client.RestClient.performRequest (
      RestClient.java:235
      <https://console.cloud.google.com/debug/fromlog?appModule=Dataflow%20Jobs&appVersion=2021-03-07_16_00_02-4986317764329077743&file=org%2Felasticsearch.client%2FRestClient.java&line=235&project=prosimo-firstnetwork&authuser=1>
      )
   2.
      1. at org.apache.beam.sdk.io.elasticsearch.
      ElasticsearchIO$BoundedElasticsearchReader.close (
      ElasticsearchIO.java:918)
      2. at org.apache.beam.runners.dataflow.worker.
      WorkerCustomSources$BoundedReaderIterator.close (
      WorkerCustomSources.java:632)
      3. at org.apache.beam.runners.dataflow.worker.util.common.worker.
      NativeReader$NativeReaderIterator.abort (NativeReader.java:179)
      4. at org.apache.beam.runners.dataflow.worker.util.common.worker.
      ReadOperation$SynchronizedReaderIterator.abort (ReadOperation.java:371
      )
      5. at org.apache.beam.runners.dataflow.worker.util.common.worker.
      ReadOperation.abort (ReadOperation.java:256)
      6. at org.apache.beam.runners.dataflow.worker.util.common.worker.
      MapTaskExecutor.execute (MapTaskExecutor.java:91)


Any help would be greatly appreciated.

Thanks and Regards
Mohil

On Sun, Mar 7, 2021 at 2:58 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hello ElasticSearchIO and beam users/developers,
>
> I am on Beam 2.23.0 and elasticsearch 6.8
>
> I have been using elasticsearchIO.write() successfully.
> For the first time, I am trying to use elasticsearchIO.read because I have
> a use case where I want to read data from one elasticsearch cluster,
> modify data and then write it to another elasticsearch cluster.
>
> My read transform is very simple:
>
> *Pipeline p = input.getPipeline();*
>
> *return p*
>
> *    .apply("Read_From_ES", ElasticsearchIO.read()*
>
> *        .withQuery(query)*
>
> *        .withBatchSize(50)*
>
> *        .withScrollKeepalive("3m")*
>
> *        .withConnectionConfiguration(*
>
> *
> ElasticsearchIO.ConnectionConfiguration.create(esConnectionParams.getElasticsearchEndpoints(),
> indexName, "_doc")*
>
> *                .withConnectTimeout(240000)*
>
> *                .withSocketTimeout(240000)*
>
> *
> .withUsername(esConnectionParams.getElasticsearchUsername()).withPassword(esConnectionParams.getElasticsearchPassword()))*
>
> *    );*
>
> But I am unable to submit job successfully and getting following exception
> in my READ transform:
>
> WARNING: Size estimation of the source failed:
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource@7c974942
> java.net.ConnectException: Operation timed out
> at
> org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823)
> at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
> at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getStats(ElasticsearchIO.java:797)
> at
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:700)
> at
> org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:77)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:38)
> at
> org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:35)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:484)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:463)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:423)
> at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:182)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:888)
> at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:194)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> at
> io.prosimo.analytics.beam.datacopy.DataCopy.runDataCopy(DataCopy.java:43)
> at io.prosimo.analytics.beam.datacopy.DataCopy.main(DataCopy.java:109)
> Caused by: java.net.ConnectException: Operation timed out
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
> at
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
> at
> org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
> at
> org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
> at java.lang.Thread.run(Thread.java:748)
>
> I tried  various values while specifying connectTimeout, socketTimeout,
> batchSize and scrollKeepalive, but still the same issue.
>
> Any help would be greatly appreciated.
>
> Thanks and Regards
> Mohil
>
>
>
>