You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/18 09:34:08 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] StarRocks Source & Sink connector (#3060)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 44ee9a8ad [Feature][Connector-V2] StarRocks Source & Sink connector (#3060)
44ee9a8ad is described below

commit 44ee9a8ad8ad9031b418d3a2a3ceff87bc236526
Author: Bibo <33...@users.noreply.github.com>
AuthorDate: Tue Oct 18 17:34:03 2022 +0800

    [Feature][Connector-V2] StarRocks Source & Sink connector (#3060)
    
    * [Feature][Connector-V2] StarRocks connector
    * improve starrocks e2e
---
 docs/en/connector-v2/sink/Jdbc.md                  |   1 +
 docs/en/connector-v2/source/Jdbc.md                |   1 +
 .../seatunnel/jdbc/JdbcStarRocksdbIT.java          | 162 +++++++++++++++++++++
 .../resources/jdbc_starrocks_source_to_sink.conf   |  45 ++++++
 4 files changed, 209 insertions(+)

diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index 85811b1c4..989e69b2c 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -116,6 +116,7 @@ there are some reference value for params above.
 | phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
 | sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
 | oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| starrocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | /                                                  | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
 
 ## Example
 
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index cd5746201..0d3ca3981 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -98,6 +98,7 @@ there are some reference value for params above.
 | phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
 | sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
 | oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| starrocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
 
 ## Example
 
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
new file mode 100644
index 000000000..90b062985
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.util.JdbcCompareUtil;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JdbcStarRocksdbIT extends AbstractJdbcIT {
+
+    private static final String DOCKER_IMAGE = "d87904488/starrocks-starter:2.2.1";
+    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    private static final String NETWORK_ALIASES = "e2e_starRocksdb";
+    private static final int SR_PORT = 9030;
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private static final String DATABASE = "test";
+    private static final String URL = "jdbc:mysql://" + HOST + ":" + SR_PORT + "/" + DATABASE + "?createDatabaseIfNotExist=true";
+
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private static final String SR_DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+    private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL";
+    private static final String CONFIG_FILE = "/jdbc_starrocks_source_to_sink.conf";
+
+    private static final String DDL_SOURCE = "create table "  + DATABASE + "." + SOURCE_TABLE + " (\n" +
+            "  BIGINT_COL     BIGINT,\n" +
+            "  LARGEINT_COL   LARGEINT,\n" +
+            "  SMALLINT_COL   SMALLINT,\n" +
+            "  TINYINT_COL    TINYINT,\n" +
+            "  BOOLEAN_COL    BOOLEAN,\n" +
+            "  DECIMAL_COL    DECIMAL,\n" +
+            "  DOUBLE_COL     DOUBLE,\n" +
+            "  FLOAT_COL      FLOAT,\n" +
+            "  INT_COL        INT,\n" +
+            "  CHAR_COL       CHAR,\n" +
+            "  VARCHAR_11_COL VARCHAR(11),\n" +
+            "  STRING_COL     STRING,\n" +
+            "  DATETIME_COL   DATETIME,\n" +
+            "  DATE_COL       DATE\n" +
+            ")ENGINE=OLAP\n"  +
+            "DUPLICATE KEY(`BIGINT_COL`)\n" +
+            "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+            "PROPERTIES (\n" +
+            "\"replication_num\" = \"1\",\n" +
+            "\"in_memory\" = \"false\"," +
+            "\"in_memory\" = \"false\"," +
+            "\"storage_format\" = \"DEFAULT\"" +
+            ")";
+
+
+    private static final String DDL_SINK = "create table "   + DATABASE + "." + SINK_TABLE + " (\n" +
+            "  BIGINT_COL     BIGINT,\n" +
+            "  LARGEINT_COL   LARGEINT,\n" +
+            "  SMALLINT_COL   SMALLINT,\n" +
+            "  TINYINT_COL    TINYINT,\n" +
+            "  BOOLEAN_COL    BOOLEAN,\n" +
+            "  DECIMAL_COL    DECIMAL,\n" +
+            "  DOUBLE_COL     DOUBLE,\n" +
+            "  FLOAT_COL      FLOAT,\n" +
+            "  INT_COL        INT,\n" +
+            "  CHAR_COL       CHAR,\n" +
+            "  VARCHAR_11_COL VARCHAR(11),\n" +
+            "  STRING_COL     STRING,\n" +
+            "  DATETIME_COL   DATETIME,\n" +
+            "  DATE_COL       DATE\n" +
+            ")ENGINE=OLAP\n"  +
+            "DUPLICATE KEY(`BIGINT_COL`)\n" +
+            "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+            "PROPERTIES (\n" +
+            "\"replication_num\" = \"1\",\n" +
+            "\"in_memory\" = \"false\"," +
+            "\"in_memory\" = \"false\"," +
+            "\"storage_format\" = \"DEFAULT\"" +
+            ")";
+
+    private static final String INIT_DATA_SQL = "insert into "   + DATABASE + "." + SOURCE_TABLE + " (\n" +
+            "  BIGINT_COL,\n" +
+            "  LARGEINT_COL,\n" +
+            "  SMALLINT_COL,\n" +
+            "  TINYINT_COL,\n" +
+            "  BOOLEAN_COL,\n" +
+            "  DECIMAL_COL,\n" +
+            "  DOUBLE_COL,\n" +
+            "  FLOAT_COL,\n" +
+            "  INT_COL,\n" +
+            "  CHAR_COL,\n" +
+            "  VARCHAR_11_COL,\n" +
+            "  STRING_COL,\n" +
+            "  DATETIME_COL,\n" +
+            "  DATE_COL\n" +
+            ")values(\n" +
+            "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+            ")";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(URL, SR_PORT);
+        return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
+                .host(HOST).port(SR_PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+                .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(SR_DRIVER_JAR)
+                .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
+    }
+
+    @Override
+    void compareResult() throws SQLException, IOException {
+        assertHasData(SOURCE_TABLE);
+        assertHasData(SINK_TABLE);
+        JdbcCompareUtil.compare(jdbcConnection, String.format("select * from %s.%s limit 1", DATABASE, SOURCE_TABLE),
+                String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE), COLUMN_STRING);
+        clearSinkTable();
+    }
+
+    @Override
+    void clearSinkTable() {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute(String.format("TRUNCATE TABLE %s", DATABASE + "." + SINK_TABLE));
+        } catch (SQLException e) {
+            throw new RuntimeException("test starrocks server image error", e);
+        }
+    }
+
+    @Override
+    SeaTunnelRow initTestData() {
+        return new SeaTunnelRow(
+                new Object[]{1234, 1123456, 12, 1, 0, 2222243.2222243, 2222243.22222, 1.22222, 12, "a", "VARCHAR_COL", "STRING_COL", "2022-08-13 17:35:59", "2022-08-13"});
+    }
+
+    private void assertHasData(String table) {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test starrocks server image error", e);
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
new file mode 100644
index 000000000..056cf47db
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_starrocks_source_to_sink.conf
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    driver = com.mysql.cj.jdbc.Driver
+    url = "jdbc:mysql://e2e_starRocksdb:9030"
+    user = root
+    password = ""
+    query = "select BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL from `test`.`e2e_table_source`"
+  }
+}
+
+transform {
+
+}
+
+sink {
+  Jdbc {
+    driver = com.mysql.cj.jdbc.Driver
+    url = "jdbc:mysql://e2e_starRocksdb:9030"
+    user = root
+    password = ""
+    query = "INSERT INTO `test`.`e2e_table_sink` (BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+  }
+}