You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2024/02/02 11:01:18 UTC
(seatunnel) branch dev updated: [Fix] Fix doris stream load failed not reported error (#6315)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a09a5a2bb8 [Fix] Fix doris stream load failed not reported error (#6315)
a09a5a2bb8 is described below
commit a09a5a2bb8ba9ad0ebde2064c63fbb2a269ec41a
Author: Jia Fan <fa...@qq.com>
AuthorDate: Fri Feb 2 19:01:12 2024 +0800
[Fix] Fix doris stream load failed not reported error (#6315)
---
docs/en/connector-v2/sink/Doris.md | 2 +-
.../connectors/doris/config/DorisOptions.java | 2 +-
.../doris/sink/writer/DorisSinkWriter.java | 2 +-
.../e2e/connector/doris/AbstractDorisIT.java | 2 +-
.../e2e/connector/doris/DorisErrorIT.java | 137 +++++++++++++++++++++
.../fake_source_and_doris_sink_timeout_error.conf | 67 ++++++++++
.../flink/AbstractTestFlinkContainer.java | 2 +-
7 files changed, 209 insertions(+), 5 deletions(-)
diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index 620e9e8fa5..7e8d9c6341 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -36,7 +36,7 @@ The internal implementation of Doris sink connector is cached and imported by st
| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name |
| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. |
| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. |
-| sink.enable-2pc | bool | No | - | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). |
+| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). |
| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) |
| sink.check-interval | int | No | 10000 | check exception with the interval while loading |
| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
index 36168c43bb..87f9ddcff8 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java
@@ -152,7 +152,7 @@ public interface DorisOptions {
Option<Boolean> SINK_ENABLE_2PC =
Options.key("sink.enable-2pc")
.booleanType()
- .defaultValue(true)
+ .defaultValue(false)
.withDescription("enable 2PC while loading");
Option<Integer> SINK_CHECK_INTERVAL =
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 8c945e0fed..323bac17ef 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -116,7 +116,7 @@ public class DorisSinkWriter
}
// get main work thread.
executorThread = Thread.currentThread();
- dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
+ startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
// when uploading data in streaming mode, we need to regularly detect whether there are
// exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index f8d48e465b..458a900b4b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -85,7 +85,7 @@ public abstract class AbstractDorisIT extends TestSuiteBase implements TestResou
.untilAsserted(this::initializeJdbcConnection);
}
- private void initializeJdbcConnection() throws SQLException {
+ protected void initializeJdbcConnection() throws SQLException {
Properties props = new Properties();
props.put("user", USERNAME);
props.put("password", PASSWORD);
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
new file mode 100644
index 0000000000..77936f230b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisErrorIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.doris;
+
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public class DorisErrorIT extends AbstractDorisIT {
+ private static final String TABLE = "doris_e2e_table";
+ private static final String DRIVER_JAR =
+ "https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar";
+
+ private static final String sinkDB = "e2e_sink";
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/jdbc/lib && cd /tmp/seatunnel/plugins/jdbc/lib && wget "
+ + DRIVER_JAR);
+ Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
+ };
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "flink/spark failed reason not same")
+ public void testDoris(TestContainer container) throws InterruptedException, ExecutionException {
+ initializeJdbcTable();
+ CompletableFuture<Container.ExecResult> future =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+ "/fake_source_and_doris_sink_timeout_error.conf");
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // wait for the job to start
+ Thread.sleep(10 * 1000);
+ super.container.stop();
+ Assertions.assertNotEquals(0, future.get().getExitCode());
+ super.container.start();
+ // wait for the container to restart
+ given().ignoreExceptions()
+ .await()
+ .atMost(10000, TimeUnit.SECONDS)
+ .untilAsserted(this::initializeJdbcConnection);
+ }
+
+ private void initializeJdbcTable() {
+ try {
+ try (Statement statement = jdbcConnection.createStatement()) {
+ // create test databases
+ statement.execute(createDatabase(sinkDB));
+ log.info("create sink database succeed");
+ // create sink table
+ statement.execute(createTableForTest(sinkDB));
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing table failed!", e);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Initializing jdbc failed!", e);
+ }
+ }
+
+ private String createDatabase(String db) {
+ return String.format("CREATE DATABASE IF NOT EXISTS %s ;", db);
+ }
+
+ private String createTableForTest(String db) {
+ String createTableSql =
+ "create table if not exists `%s`.`%s`(\n"
+ + "F_ID bigint null,\n"
+ + "F_INT int null,\n"
+ + "F_BIGINT bigint null,\n"
+ + "F_TINYINT tinyint null,\n"
+ + "F_SMALLINT smallint null,\n"
+ + "F_DECIMAL decimal(18,6) null,\n"
+ + "F_LARGEINT largeint null,\n"
+ + "F_BOOLEAN boolean null,\n"
+ + "F_DOUBLE double null,\n"
+ + "F_FLOAT float null,\n"
+ + "F_CHAR char null,\n"
+ + "F_VARCHAR_11 varchar(11) null,\n"
+ + "F_STRING string null,\n"
+ + "F_DATETIME_P datetime(6),\n"
+ + "F_DATETIME datetime,\n"
+ + "F_DATE date\n"
+ + ")\n"
+ + "duplicate KEY(`F_ID`)\n"
+ + "DISTRIBUTED BY HASH(`F_ID`) BUCKETS 1\n"
+ + "properties(\n"
+ + "\"replication_allocation\" = \"tag.location.default: 1\""
+ + ");";
+ return String.format(createTableSql, db, TABLE);
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
new file mode 100644
index 0000000000..53e0eadaa4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/fake_source_and_doris_sink_timeout_error.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env{
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ FakeSource {
+ row.num = 100
+ split.num = 10
+ split.read-interval = 10000
+ string.length = 1
+ schema = {
+ fields {
+ F_ID = "string"
+ F_INT = "int"
+ F_BIGINT = "time"
+ F_TINYINT = "tinyint"
+ F_SMALLINT = "smallint"
+ F_DECIMAL = "decimal(10,2)"
+ F_LARGEINT = "bigint"
+ F_BOOLEAN = "boolean"
+ F_DOUBLE = "double"
+ F_FLOAT = "float"
+ F_CHAR = "string"
+ F_VARCHAR_11 = "string"
+ F_STRING = "string"
+ F_DATETIME_P = "timestamp"
+ F_DATETIME = "timestamp"
+ F_DATE = "date"
+ }
+ }
+ }
+}
+
+transform {}
+
+sink{
+ Doris {
+ fenodes = "doris_e2e:8030"
+ username = root
+ password = ""
+ table.identifier = "e2e_sink.doris_e2e_table"
+ sink.enable-2pc = "true"
+ sink.label-prefix = "test_json"
+ doris.config = {
+ format="json"
+ read_json_by_line="true"
+ }
+ }
+ }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 82e8e79c11..91320b3d9a 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -155,7 +155,7 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
@Override
public String getServerLogs() {
- return jobManager.getLogs();
+ return jobManager.getLogs() + "\n" + taskManager.getLogs();
}
public String executeJobManagerInnerCommand(String command)