You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/11/29 17:12:02 UTC

[jira] [Commented] (BEAM-10990) Elasticsearch IO Infinite loop with write Error when the pipeline job streaming mode

    [ https://issues.apache.org/jira/browse/BEAM-10990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240293#comment-17240293 ] 

Beam JIRA Bot commented on BEAM-10990:
--------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> Elasticsearch IO Infinite loop with write Error when the pipeline job streaming  mode
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-10990
>                 URL: https://issues.apache.org/jira/browse/BEAM-10990
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>    Affects Versions: 2.24.0
>            Reporter: Steven Gaunt
>            Priority: P2
>              Labels: stale-P2
>             Fix For: 2.26.0
>
>
> When streaming messages from PubsubIO , the pipeline is in Streaming mode.
> If for some reason the ElasticSearchIO.Write() has an response from the ElasticSearch index api, the writefn will throw a IOException.  Since this excetpion is part of the Write transform, it becomes an unhandled error.  This will then inheritly cause behaviour from the job pipeline to infinitely retry that error.. 
> _*snippet form beam website*_
> The Dataflow service retries failed tasks up to 4 times in batch mode, and an unlimited number of times in streaming mode. In batch mode, your job will fail; in streaming, it may stall indefinitely.
>  
> This is the ElasticSearchIO.write transform .
> {code:java}
>   public PDone expand(PCollection<String> input) {
>             ElasticsearchIO.ConnectionConfiguration connectionConfiguration = this.getConnectionConfiguration();
>             Preconditions.checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
>        *     input.apply(ParDo.of(new ElasticsearchIO.Write.WriteFn(this)));*
>             return PDone.in(input.getPipeline());
>         }
> {code}
> The pardo function (WriteFn) finishBundle step will call ElasticsearchIO.checkForErrors helper method which will throw exception if the http response from elasticsearch has error in the json reponse.
> {code:java}
> // Some comments here
>  public void finishBundle(DoFn<String, Void>.FinishBundleContext context) throws IOException, InterruptedException {
>                 this.flushBatch();
>             }
>             private void flushBatch() throws IOException, InterruptedException {
>                 if (!this.batch.isEmpty()) {
>                     StringBuilder bulkRequest = new StringBuilder();
>                     Iterator var2 = this.batch.iterator();
>                     while(var2.hasNext()) {
>                         String json = (String)var2.next();
>                         bulkRequest.append(json);
>                     }
>                     this.batch.clear();
>                     this.currentBatchSizeBytes = 0L;
>                     String endPoint = String.format("/%s/%s/_bulk", this.spec.getConnectionConfiguration().getIndex(), this.spec.getConnectionConfiguration().getType());
>                     HttpEntity requestBody = new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
>                     Request request = new Request("POST", endPoint);
>                     request.addParameters(Collections.emptyMap());
>                     request.setEntity(requestBody);
>                     Response response = this.restClient.performRequest(request);
>                     HttpEntity responseEntity = new BufferedHttpEntity(response.getEntity());
>                     if (this.spec.getRetryConfiguration() != null && this.spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
>                         responseEntity = this.handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
>                     }
>                     ElasticsearchIO.checkForErrors((HttpEntity)responseEntity, this.backendVersion, this.spec.getUsePartialUpdate());
>                 }
>             }
>  static void checkForErrors(HttpEntity responseEntity, int backendVersion, boolean partialUpdate) throws IOException {
>         JsonNode searchResult = parseResponse(responseEntity);
>         boolean errors = searchResult.path("errors").asBoolean();
>         if (errors) {
>             StringBuilder errorMessages = new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
>             JsonNode items = searchResult.path("items");
>             Iterator var7 = items.iterator();
>             while(var7.hasNext()) {
>                 JsonNode item = (JsonNode)var7.next();
>                 String errorRootName = "";
>                 if (partialUpdate) {
>                     errorRootName = "update";
>                 } else if (backendVersion == 2) {
>                     errorRootName = "create";
>                 } else if (backendVersion >= 5) {
>                     errorRootName = "index";
>                 }
>                 JsonNode errorRoot = item.path(errorRootName);
>                 JsonNode error = errorRoot.get("error");
>                 if (error != null) {
>                     String type = error.path("type").asText();
>                     String reason = error.path("reason").asText();
>                     String docId = errorRoot.path("_id").asText();
>                     errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
>                     JsonNode causedBy = error.get("caused_by");
>                     if (causedBy != null) {
>                         String cbReason = causedBy.path("reason").asText();
>                         String cbType = causedBy.path("type").asText();
>                         errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
>                     }
>                 }
>             }
>             throw new IOException(errorMessages.toString());
>         }
>     }
> {code}
>  
> As a possible suggestion, rather than throw the exception, could it be possible to write the exception to an errorhandling tupeltag which then can handled to a deadletter queue ?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)