You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/11/29 17:12:02 UTC
[jira] [Commented] (BEAM-10990) Elasticsearch IO Infinite loop with
write Error when the pipeline job streaming mode
[ https://issues.apache.org/jira/browse/BEAM-10990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240293#comment-17240293 ]
Beam JIRA Bot commented on BEAM-10990:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.
> Elasticsearch IO Infinite loop with write Error when the pipeline job streaming mode
> -------------------------------------------------------------------------------------
>
> Key: BEAM-10990
> URL: https://issues.apache.org/jira/browse/BEAM-10990
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Affects Versions: 2.24.0
> Reporter: Steven Gaunt
> Priority: P2
> Labels: stale-P2
> Fix For: 2.26.0
>
>
> When streaming messages from PubsubIO , the pipeline is in Streaming mode.
> If for some reason the ElasticSearchIO.Write() has an response from the ElasticSearch index api, the writefn will throw a IOException. Since this excetpion is part of the Write transform, it becomes an unhandled error. This will then inheritly cause behaviour from the job pipeline to infinitely retry that error..
> _*snippet form beam website*_
> The Dataflow service retries failed tasks up to 4 times in batch mode, and an unlimited number of times in streaming mode. In batch mode, your job will fail; in streaming, it may stall indefinitely.
>
> This is the ElasticSearchIO.write transform .
> {code:java}
> public PDone expand(PCollection<String> input) {
> ElasticsearchIO.ConnectionConfiguration connectionConfiguration = this.getConnectionConfiguration();
> Preconditions.checkState(connectionConfiguration != null, "withConnectionConfiguration() is required");
> * input.apply(ParDo.of(new ElasticsearchIO.Write.WriteFn(this)));*
> return PDone.in(input.getPipeline());
> }
> {code}
> The pardo function (WriteFn) finishBundle step will call ElasticsearchIO.checkForErrors helper method which will throw exception if the http response from elasticsearch has error in the json reponse.
> {code:java}
> // Some comments here
> public void finishBundle(DoFn<String, Void>.FinishBundleContext context) throws IOException, InterruptedException {
> this.flushBatch();
> }
> private void flushBatch() throws IOException, InterruptedException {
> if (!this.batch.isEmpty()) {
> StringBuilder bulkRequest = new StringBuilder();
> Iterator var2 = this.batch.iterator();
> while(var2.hasNext()) {
> String json = (String)var2.next();
> bulkRequest.append(json);
> }
> this.batch.clear();
> this.currentBatchSizeBytes = 0L;
> String endPoint = String.format("/%s/%s/_bulk", this.spec.getConnectionConfiguration().getIndex(), this.spec.getConnectionConfiguration().getType());
> HttpEntity requestBody = new NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON);
> Request request = new Request("POST", endPoint);
> request.addParameters(Collections.emptyMap());
> request.setEntity(requestBody);
> Response response = this.restClient.performRequest(request);
> HttpEntity responseEntity = new BufferedHttpEntity(response.getEntity());
> if (this.spec.getRetryConfiguration() != null && this.spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) {
> responseEntity = this.handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
> }
> ElasticsearchIO.checkForErrors((HttpEntity)responseEntity, this.backendVersion, this.spec.getUsePartialUpdate());
> }
> }
> static void checkForErrors(HttpEntity responseEntity, int backendVersion, boolean partialUpdate) throws IOException {
> JsonNode searchResult = parseResponse(responseEntity);
> boolean errors = searchResult.path("errors").asBoolean();
> if (errors) {
> StringBuilder errorMessages = new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
> JsonNode items = searchResult.path("items");
> Iterator var7 = items.iterator();
> while(var7.hasNext()) {
> JsonNode item = (JsonNode)var7.next();
> String errorRootName = "";
> if (partialUpdate) {
> errorRootName = "update";
> } else if (backendVersion == 2) {
> errorRootName = "create";
> } else if (backendVersion >= 5) {
> errorRootName = "index";
> }
> JsonNode errorRoot = item.path(errorRootName);
> JsonNode error = errorRoot.get("error");
> if (error != null) {
> String type = error.path("type").asText();
> String reason = error.path("reason").asText();
> String docId = errorRoot.path("_id").asText();
> errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
> JsonNode causedBy = error.get("caused_by");
> if (causedBy != null) {
> String cbReason = causedBy.path("reason").asText();
> String cbType = causedBy.path("type").asText();
> errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
> }
> }
> }
> throw new IOException(errorMessages.toString());
> }
> }
> {code}
>
> As a possible suggestion, rather than throw the exception, could it be possible to write the exception to an errorhandling tupeltag which then can handled to a deadletter queue ?
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)