You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/20 04:17:30 UTC

[GitHub] [incubator-seatunnel] harveyyue opened a new pull request, #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

harveyyue opened a new pull request, #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148

   The code changes as bleow:
   1. Improve the retriable mechanism of es bulk sink
   2. Introduce a method `computeRetryWaitTimeMillis` to compute the wait time 
   3. Fix the attempt times bug within method `retryWithException`
   
   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148#issuecomment-1299521888

   Please resolve ci problem, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148#discussion_r1001342147


##########
seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java:
##########
@@ -17,7 +17,13 @@
 
 package org.apache.seatunnel.common.utils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
 public class RetryUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);

Review Comment:
   Please use @slf4j reference https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java



##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java:
##########
@@ -99,29 +105,27 @@ public Optional<ElasticsearchCommitInfo> prepareCommit() {
     public void abortPrepare() {
     }
 
-    public void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsList, int maxRetry) {
-        for (int tryCnt = 1; tryCnt <= maxRetry; tryCnt++) {
-            if (requestEsList.size() > 0) {
-                String requestBody = String.join("\n", requestEsList) + "\n";
-                try {
+    public void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsList) {
+        try {
+            RetryUtils.retryWithException(() -> {
+                if (requestEsList.size() > 0) {
+                    String requestBody = String.join("\n", requestEsList) + "\n";
                     BulkResponse bulkResponse = esRestClient.bulk(requestBody);
-                    if (!bulkResponse.isErrors()) {
-                        break;
+                    if (bulkResponse.isErrors()) {
+                        throw new BulkElasticsearchException("bulk es error: " + bulkResponse.getResponse());
                     }
-                } catch (Exception ex) {
-                    if (tryCnt == maxRetry) {
-                        throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
-                    }
-                    log.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
+                    return bulkResponse;
                 }
-
-            }
+                return null;
+            }, retryMaterial);
+        } catch (Exception e) {
+            throw new RuntimeException("ElasticSearch execute batch statement error", e);

Review Comment:
   Please use `SeaTunnelException` and we have an issue to discuss the unified Exception define. If you interested in it, you can see https://github.com/apache/incubator-seatunnel/pull/3045.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148#discussion_r1027048963


##########
seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java:
##########
@@ -46,18 +51,36 @@ public static <T> T retryWithException(Execution<T, Exception> execution, RetryM
                     if (retryMaterial.shouldThrowException()) {
                         throw e;
                     }
-                } else if (retryMaterial.getSleepTimeMillis() > 0) {
-                    Thread.sleep(retryMaterial.getSleepTimeMillis());
+                } else {
+                    // Otherwise it is retriable and we should retry
+                    String attemptMessage = "Failed to execute due to {}. Retrying attempt ({}/{}) after backoff of {} ms";
+                    if (retryMaterial.getSleepTimeMillis() > 0) {
+                        long backoff = retryMaterial.computeRetryWaitTimeMillis(i);
+                        log.warn(attemptMessage, e.getCause(), i, retryTimes, backoff);
+                        Thread.sleep(backoff);
+                    } else {
+                        log.warn(attemptMessage, e.getCause(), i, retryTimes, 0);
+                    }
                 }
             }
-        } while (i <= retryTimes);
+        } while (i < retryTimes);
         if (retryMaterial.shouldThrowException()) {
             throw new RuntimeException("Execute given execution failed after retry " + retryTimes + " times", lastException);
         }
         return null;
     }
 
     public static class RetryMaterial {
+        /**
+         * An arbitrary absolute maximum practical retry time.
+         */
+        public static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(1);

Review Comment:
   The max retry is 1 hours, it's to long. 1 minutes will be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148#discussion_r1001663022


##########
seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java:
##########
@@ -17,7 +17,13 @@
 
 package org.apache.seatunnel.common.utils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
 public class RetryUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(RetryUtils.class);

Review Comment:
   done.



##########
seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java:
##########
@@ -99,29 +105,27 @@ public Optional<ElasticsearchCommitInfo> prepareCommit() {
     public void abortPrepare() {
     }
 
-    public void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsList, int maxRetry) {
-        for (int tryCnt = 1; tryCnt <= maxRetry; tryCnt++) {
-            if (requestEsList.size() > 0) {
-                String requestBody = String.join("\n", requestEsList) + "\n";
-                try {
+    public void bulkEsWithRetry(EsRestClient esRestClient, List<String> requestEsList) {
+        try {
+            RetryUtils.retryWithException(() -> {
+                if (requestEsList.size() > 0) {
+                    String requestBody = String.join("\n", requestEsList) + "\n";
                     BulkResponse bulkResponse = esRestClient.bulk(requestBody);
-                    if (!bulkResponse.isErrors()) {
-                        break;
+                    if (bulkResponse.isErrors()) {
+                        throw new BulkElasticsearchException("bulk es error: " + bulkResponse.getResponse());
                     }
-                } catch (Exception ex) {
-                    if (tryCnt == maxRetry) {
-                        throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
-                    }
-                    log.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
+                    return bulkResponse;
                 }
-
-            }
+                return null;
+            }, retryMaterial);
+        } catch (Exception e) {
+            throw new RuntimeException("ElasticSearch execute batch statement error", e);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148#issuecomment-1320823351

   The other code is fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3148: [Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3148:
URL: https://github.com/apache/incubator-seatunnel/pull/3148#discussion_r1027466091


##########
seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/RetryUtils.java:
##########
@@ -46,18 +51,36 @@ public static <T> T retryWithException(Execution<T, Exception> execution, RetryM
                     if (retryMaterial.shouldThrowException()) {
                         throw e;
                     }
-                } else if (retryMaterial.getSleepTimeMillis() > 0) {
-                    Thread.sleep(retryMaterial.getSleepTimeMillis());
+                } else {
+                    // Otherwise it is retriable and we should retry
+                    String attemptMessage = "Failed to execute due to {}. Retrying attempt ({}/{}) after backoff of {} ms";
+                    if (retryMaterial.getSleepTimeMillis() > 0) {
+                        long backoff = retryMaterial.computeRetryWaitTimeMillis(i);
+                        log.warn(attemptMessage, e.getCause(), i, retryTimes, backoff);
+                        Thread.sleep(backoff);
+                    } else {
+                        log.warn(attemptMessage, e.getCause(), i, retryTimes, 0);
+                    }
                 }
             }
-        } while (i <= retryTimes);
+        } while (i < retryTimes);
         if (retryMaterial.shouldThrowException()) {
             throw new RuntimeException("Execute given execution failed after retry " + retryTimes + " times", lastException);
         }
         return null;
     }
 
     public static class RetryMaterial {
+        /**
+         * An arbitrary absolute maximum practical retry time.
+         */
+        public static final long MAX_RETRY_TIME_MS = TimeUnit.HOURS.toMillis(1);

Review Comment:
   Agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org