You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/10/21 02:11:10 UTC

[doris] branch master updated: [improvement](regression-test) wait for publish timeout of stream load (#13531)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e168c87c6 [improvement](regression-test) wait for publish timeout of stream load (#13531)
3e168c87c6 is described below

commit 3e168c87c6b93afc012cb64412ab9b369a41bd18
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Oct 21 10:11:03 2022 +0800

    [improvement](regression-test) wait for publish timeout of stream load (#13531)
---
 regression-test/conf/regression-conf.groovy        |  4 +--
 .../regression/action/StreamLoadAction.groovy      | 38 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy
index 0f483190c4..4577d0fe08 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -20,11 +20,11 @@
 // **Note**: default db will be create if not exist
 defaultDb = "regression_test"
 
-jdbcUrl = "jdbc:mysql://127.0.0.1:9033/?"
+jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?"
 jdbcUser = "root"
 jdbcPassword = ""
 
-feHttpAddress = "127.0.0.1:8033"
+feHttpAddress = "127.0.0.1:8030"
 feHttpUser = "root"
 feHttpPassword = ""
 
diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
index ab3eeadd7b..075f751ae9 100644
--- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
+++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/action/StreamLoadAction.groovy
@@ -22,6 +22,7 @@ import groovy.transform.stc.ClosureParams
 import groovy.transform.stc.FromString
 import org.apache.doris.regression.suite.SuiteContext
 import org.apache.doris.regression.util.BytesInputStream
+import org.apache.doris.regression.util.JdbcUtils
 import org.apache.doris.regression.util.OutputUtils
 import groovy.json.JsonSlurper
 import groovy.util.logging.Slf4j
@@ -286,6 +287,9 @@ class StreamLoadAction implements SuiteAction {
     }
 
     private void checkResult(String responseText, Throwable ex, long startTime, long endTime) {
+        String finalStatus = waitForPublishOrFailure(responseText)
+        log.info("The origin stream load result: ${responseText}, final status: ${finalStatus}")
+        responseText = responseText.replace("Publish Timeout", finalStatus)
         if (check != null) {
             check.call(responseText, ex, startTime, endTime)
         } else {
@@ -323,4 +327,38 @@ class StreamLoadAction implements SuiteAction {
             }
         }
     }
+
+    // Sometime the stream load may return "PUBLISH TIMEOUT"
+    // This is not a fatal error but may cause test fail.
+    // So here we wait for at most 60s, using "show transaction" to check the
+    // status of txn, and return once it become ABORTED or VISIBLE.
+    private String waitForPublishOrFailure(String responseText) {
+        long maxWaitSecond = 60;
+        def jsonSlurper = new JsonSlurper()
+        def parsed = jsonSlurper.parseText(responseText)
+        String status = parsed.Status
+        long txnId = parsed.TxnId
+        if (!status.equalsIgnoreCase("Publish Timeout")) {
+            return status;
+        }
+
+        log.info("Stream load with txn ${txnId} is publish timeout")
+        String sql = "show transaction from ${db} where id = ${txnId}"
+        String st = "PREPARE"
+        while (!st.equalsIgnoreCase("VISIBLE") && !st.equalsIgnoreCase("ABORTED") && maxWaitSecond > 0) {
+            Thread.sleep(2000)
+            maxWaitSecond -= 2
+            def (result, meta) = JdbcUtils.executeToStringList(context.getConnection(), sql)
+            if (result.size() != 1) {
+                throw new IllegalStateException("Failed to get txn's ${txnId}")
+            }
+            st = String.valueOf(result[0][3])
+        }
+        log.info("Stream load with txn ${txnId} is ${st}")
+        if (st.equalsIgnoreCase("VISIBLE")) {
+            return "Success";
+        } else {
+            return "Fail";
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org