You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2024/02/02 11:01:18 UTC

(seatunnel) branch dev updated: [Fix] Fix doris stream load failed not reported error (#6315)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a09a5a2bb8 [Fix] Fix doris stream load failed not reported error (#6315)
a09a5a2bb8 is described below

commit a09a5a2bb8ba9ad0ebde2064c63fbb2a269ec41a
Author: Jia Fan <fa...@qq.com>
AuthorDate: Fri Feb 2 19:01:12 2024 +0800

    [Fix] Fix doris stream load failed not reported error (#6315)
---
 docs/en/connector-v2/sink/Doris.md                 |   2 +-
 .../connectors/doris/config/DorisOptions.java      |   2 +-
 .../doris/sink/writer/DorisSinkWriter.java         |   2 +-
 .../e2e/connector/doris/AbstractDorisIT.java       |   2 +-
 .../e2e/connector/doris/DorisErrorIT.java          | 137 +++++++++++++++++++++
 .../fake_source_and_doris_sink_timeout_error.conf  |  67 ++++++++++
 .../flink/AbstractTestFlinkContainer.java          |   2 +-
 7 files changed, 209 insertions(+), 5 deletions(-)

diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index 620e9e8fa5..7e8d9c6341 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -36,7 +36,7 @@ The internal implementation of Doris sink connector is cached and imported by st
 | table                          | String  | Yes      | -                            | The table name of `Doris` table,  use `${table_name}` to represent the upstream table name                                                                                                                                                                                                   |
 | table.identifier               | String  | Yes      | -                            | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead.                                                                                                                                                                                 |
 | sink.label-prefix              | String  | Yes      | -                            | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.                                                                                                                                                   |
-| sink.enable-2pc                | bool    | No       | -                            | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).                                     |
+| sink.enable-2pc                | bool    | No       | false                        | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).                                                                      |
 | sink.enable-delete             | bool    | No       | -                            | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) |
 | sink.check-interval            | int     | No       | 10000                        | check exception with the interval while loading                                                                                                                                                                                                                                              |
 | sink.max-retries               | int     | No       | 3                            | the max retry times if writing records to database failed                                                                                                                                                                                                                                    |
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 36168c43bb..87f9ddcff8 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -152,7 +152,7 @@ public interface DorisOptions {
     Option<Boolean> SINK_ENABLE_2PC =
             Options.key("sink.enable-2pc")
                     .booleanType()
-                    .defaultValue(true)
+                    .defaultValue(false)
                     .withDescription("enable 2PC while loading");
 
     Option<Integer> SINK_CHECK_INTERVAL =
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 8c945e0fed..323bac17ef 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -116,7 +116,7 @@ public class DorisSinkWriter
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
+        startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
         // when uploading data in streaming mode, we need to regularly detect whether there are
         // exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index f8d48e465b..458a900b4b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -85,7 +85,7 @@ public abstract class AbstractDorisIT extends TestSuiteBase implements TestResou
                 .untilAsserted(this::initializeJdbcConnection);
     }
 
-    private void initializeJdbcConnection() throws SQLException {
+    protected void initializeJdbcConnection() throws SQLException {
         Properties props = new Properties();
         props.put("user", USERNAME);
         props.put("password", PASSWORD);
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
new file mode 100644
index 0000000000..77936f230b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.doris;
+
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public class DorisErrorIT extends AbstractDorisIT {
+    private static final String TABLE = "doris_e2e_table";
+    private static final String DRIVER_JAR =
+            "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+
+    private static final String sinkDB = "e2e_sink";
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/jdbc/lib && cd /tmp/seatunnel/plugins/jdbc/lib && wget "
+                                        + DRIVER_JAR);
+                Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
+            };
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "flink/spark failed reason not same")
+    public void testDoris(TestContainer container) throws InterruptedException, ExecutionException {
+        initializeJdbcTable();
+        CompletableFuture<Container.ExecResult> future =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                return container.executeJob(
+                                        "/fake_source_and_doris_sink_timeout_error.conf");
+                            } catch (IOException | InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        // wait for the job to start
+        Thread.sleep(10 * 1000);
+        super.container.stop();
+        Assertions.assertNotEquals(0, future.get().getExitCode());
+        super.container.start();
+        // wait for the container to restart
+        given().ignoreExceptions()
+                .await()
+                .atMost(10000, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+    }
+
+    private void initializeJdbcTable() {
+        try {
+            try (Statement statement = jdbcConnection.createStatement()) {
+                // create test databases
+                statement.execute(createDatabase(sinkDB));
+                log.info("create sink database succeed");
+                // create sink table
+                statement.execute(createTableForTest(sinkDB));
+            } catch (SQLException e) {
+                throw new RuntimeException("Initializing table failed!", e);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Initializing jdbc failed!", e);
+        }
+    }
+
+    private String createDatabase(String db) {
+        return String.format("CREATE DATABASE IF NOT EXISTS %s ;", db);
+    }
+
+    private String createTableForTest(String db) {
+        String createTableSql =
+                "create table if not exists `%s`.`%s`(\n"
+                        + "F_ID bigint null,\n"
+                        + "F_INT int null,\n"
+                        + "F_BIGINT bigint null,\n"
+                        + "F_TINYINT tinyint null,\n"
+                        + "F_SMALLINT smallint null,\n"
+                        + "F_DECIMAL decimal(18,6) null,\n"
+                        + "F_LARGEINT largeint null,\n"
+                        + "F_BOOLEAN boolean null,\n"
+                        + "F_DOUBLE double null,\n"
+                        + "F_FLOAT float null,\n"
+                        + "F_CHAR char null,\n"
+                        + "F_VARCHAR_11 varchar(11) null,\n"
+                        + "F_STRING string null,\n"
+                        + "F_DATETIME_P datetime(6),\n"
+                        + "F_DATETIME datetime,\n"
+                        + "F_DATE date\n"
+                        + ")\n"
+                        + "duplicate KEY(`F_ID`)\n"
+                        + "DISTRIBUTED BY HASH(`F_ID`) BUCKETS 1\n"
+                        + "properties(\n"
+                        + "\"replication_allocation\" = \"tag.location.default: 1\""
+                        + ");";
+        return String.format(createTableSql, db, TABLE);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
new file mode 100644
index 0000000000..53e0eadaa4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env{
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+  FakeSource {
+    row.num = 100
+    split.num = 10
+    split.read-interval = 10000
+    string.length = 1
+    schema = {
+      fields {
+        F_ID = "string"
+        F_INT = "int"
+        F_BIGINT = "time"
+        F_TINYINT = "tinyint"
+        F_SMALLINT = "smallint"
+        F_DECIMAL = "decimal(10,2)"
+        F_LARGEINT = "bigint"
+        F_BOOLEAN = "boolean"
+        F_DOUBLE = "double"
+        F_FLOAT = "float"
+        F_CHAR = "string"
+        F_VARCHAR_11 = "string"
+        F_STRING = "string"
+        F_DATETIME_P = "timestamp"
+        F_DATETIME = "timestamp"
+        F_DATE = "date"
+      }
+    }
+  }
+}
+
+transform {}
+
+sink{
+  Doris {
+          fenodes = "doris_e2e:8030"
+          username = root
+          password = ""
+          table.identifier = "e2e_sink.doris_e2e_table"
+          sink.enable-2pc = "true"
+          sink.label-prefix = "test_json"
+          doris.config = {
+              format="json"
+              read_json_by_line="true"
+          }
+      }
+  }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 82e8e79c11..91320b3d9a 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -155,7 +155,7 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
 
     @Override
     public String getServerLogs() {
-        return jobManager.getLogs();
+        return jobManager.getLogs() + "\n" + taskManager.getLogs();
     }
 
     public String executeJobManagerInnerCommand(String command)