You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/08/06 10:19:22 UTC

[GitHub] [incubator-seatunnel] laglangyue opened a new pull request, #2377: Jdbc dm

laglangyue opened a new pull request, #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1207537200

    If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1236110721

   ![image](https://user-images.githubusercontent.com/35491928/188270558-93b29da0-5943-4dc4-bccd-2b423d5062d7.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1211963812

   > Have you tested XA-Transaction?, This will be used in exactly-once Sink. And add how to configure exactly-once sink in the document
   
   Thx for your guide, I will do it this weekend. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1236355001

   I has fix the date,time,timestamp convert in other PR,and the e2e test is finished.
   the dirver of DM is Apache license in maven-center-reponsitory, so I add it. such as mysql/pg, how do you think it.
   In my opinion, I will remove the driver from connector,and add dirver to e2e when the PR #2640 is merged.
   Please review it angin @hailin0 @ic4y 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1238178946

   @ic4y @Hisoka-X @CalvinKirs  I has rebased it to dev, and change dependencies,Please review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1221482099

   > The PostgreSQL driver is directly introduced and introduced into the Lib of connector-v2. I am afraid this is not appropriate.
   
   @CalvinKirs Do you have any suggestions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1208024015

   > > I imitate MySqlCatalog to DMCatalog, but I don't know it work. and the workflows is failed, I need some time to solve it,and I try to learn know how to do it rencently.
   > 
   > Ask any questions at any time.
   
   
   TypeMapper is in the method 
   org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog#fromJdbcType
   
   but also in this method
   org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper
   
   Any difference?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1236110835

   flink job is finish and there ara not any error log,but sink not has data


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#discussion_r964341373


##########
seatunnel-dist/release-docs/licenses/LICENSE-DMjdbc.txt:
##########
@@ -0,0 +1,202 @@
+

Review Comment:
   And do not need this file



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
+        server = new GenericContainer<>(DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        server.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", 5236, 5236)));
+        Startables.deepStart(Stream.of(server)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start
+        Class.forName(DRIVER_CLASS);
+        given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }
+
+    @Test
+    @DisplayName("flink JDBC-DM test")
+    public void testJdbcDmdbSourceAndSink() throws IOException, InterruptedException, SQLException {
+        assertHasData(SOURCE_TABLE);
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_dm_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        assertHasData(SINK_TABLE);

Review Comment:
   Suggest check whether the data in the two tables are the same



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JDBCDmdbIT.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JDBCDmdbIT extends SparkContainer {
+
+    private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "spark_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private GenericContainer<?> dmServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void beforeAllForDM() {
+        try {
+            dmServer = new GenericContainer<>(DM_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+            dmServer.setPortBindings(Lists.newArrayList("5236:5236"));
+            Startables.deepStart(Stream.of(dmServer)).join();
+            log.info("dm container started");
+            Class.forName(DRIVER_CLASS);
+            given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+            initializeJdbcTable();
+        } catch (Exception ex) {
+            log.error("dm container init failed", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        URL resource = JDBCDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }
+
+    @Test
+    @DisplayName("spark JDBC-DM test for all type mapper")
+    public void testDMDBSourceToJdbcSink() throws SQLException, IOException, InterruptedException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_dm_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        assertHasData(SINK_TABLE);

Review Comment:
   as above



##########
tools/dependencies/known-dependencies.txt:
##########
@@ -736,4 +736,5 @@ zstd-jni-1.3.3-1.jar
 zstd-jni-1.4.3-1.jar
 jakarta.activation-api-1.2.1.jar
 jakarta.xml.bind-api-2.3.2.jar
-postgresql-42.3.3.jar
\ No newline at end of file
+postgresql-42.3.3.jar
+DmJdbcDriver18-8.1.2.141.jar

Review Comment:
   Now the connector and e2e do not need to add a license, delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1207583912

   
   > I imitate MySqlCatalog to DMCatalog, but I don't know it work. and the workflows is failed, I need some time to solve it,and I try to learn know how to do it rencently.
   
   Ask any questions at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1208921975

   > DM can also temporarily not implement catalog
   
   thx,I get , I will modify my code this night


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1207537304

   @ic4y PTAL,thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1224101214

   ![image](https://user-images.githubusercontent.com/35491928/186173370-8cd1e3ea-2aa9-480f-b9b7-821de51091b9.png)
   I have 2 ideas to solve it.
   1. `maven-dependency-plugin` generate the dir such lib include all dirver-jar, and copy it to container
   2. Run the command in container ` wget https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.2.141/DmJdbcDriver18-8.1.2.141.jar`,and copy it to lib.
   
   In our pom just need to provided scope
   @CalvinKirs 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on a diff in pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on code in PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#discussion_r965157543


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JDBCDmdbIT.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JDBCDmdbIT extends SparkContainer {
+
+    private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "spark_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private GenericContainer<?> dmServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void beforeAllForDM() {
+        try {
+            dmServer = new GenericContainer<>(DM_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+            dmServer.setPortBindings(Lists.newArrayList("5236:5236"));
+            Startables.deepStart(Stream.of(dmServer)).join();
+            log.info("dm container started");
+            Class.forName(DRIVER_CLASS);
+            given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+            initializeJdbcTable();
+        } catch (Exception ex) {
+            log.error("dm container init failed", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        URL resource = JDBCDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }

Review Comment:
   this case is test the DDL/DML for Dmdb is ok. if someone modify the DDL/DML ,this testcase can work



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1208343749

   another question, Mysql has the catalog named MysqlCatalog ,and why does PostgreSQL have no catalog such as PostGresqlCatalog?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1208842440

   > another question, Mysql has the catalog named MysqlCatalog ,and why does PostgreSQL have no catalog such as PostGresqlCatalog?
   
   Because only the catalog of mysql is currently implemented, pg has not yet been implemented. DM can also temporarily not implement catalog


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1237607575

   @ic4y Hi, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1240559133

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1211667965

   Have you tested XA-Transaction?, This will be used in exactly-once Sink. And add how to configure exactly-once  sink  in the document


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1221266380

   我遇到了一个困难,postgresql的驱动是直接引入的,并且被引入到connector-v2的lib中,这恐怕不合适。
   在e2e test,需要一个驱动在docker里面,我在本地可以手动复制到connector-v2-dist的lib文件夹中,但是在ci环境或者别人去运行将会failed。
   所以我想,是否需要一个额外的模块,类似e2e-driver-dist,提供给e2e test,复制到docker中。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] CalvinKirs commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
CalvinKirs commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1229808429

   > 
   
   In fact, we could have a dist directory dedicated to these jars if we really needed to. (if necessary), but it seems that they are all jdbc driven, as a temporary solution, you can also use the second one. Of course, how you do it is up to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1214560829

   @laglangyue Wait for the JDBC e2e test (#2321) to merge. You need to add test for DM in jdbc e2e test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#discussion_r943182844


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbTypeMapper.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class DmdbTypeMapper implements JdbcDialectTypeMapper {
+
+    // ============================data types=====================
+    private static final String DM_BIT = "BIT";
+
+    // ----------------------------number-------------------------
+    private static final String DM_NUMERIC = "NUMERIC";
+    private static final String DM_NUMBER = "NUMBER";
+    private static final String DM_DECIMAL = "DECIMAL";
+    /**
+     * same to DECIMAL
+     */
+    private static final String DM_DEC = "DEC";
+
+    // ----------------------------int-----------------------------
+    private static final String DM_INTEGER = "INTEGER";
+    private static final String DM_INT = "INT";
+    private static final String DM_BIGINT = "BIGINT";
+    private static final String DM_TINYINT = "TINYINT";
+    private static final String DM_BYTE = "BYTE";
+    private static final String DM_SMALLINT = "SMALLINT";
+
+    // dm float is double for Cpp.
+    private static final String DM_FLOAT = "FLOAT";
+    private static final String DM_DOUBLE = "DOUBLE";
+    private static final String DM_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String DM_REAL = "REAL";
+
+    // DM_CHAR DM_CHARACTER DM_VARCHAR DM_VARCHAR2 max is 32767
+    private static final String DM_CHAR = "CHAR";
+    private static final String DM_CHARACTER = "CHARACTER";
+    private static final String DM_VARCHAR = "VARCHAR";
+    private static final String DM_VARCHAR2 = "VARCHAR2";
+    private static final String DM_LONGVARCHAR = "LONGVARCHAR";
+    private static final String DM_CLOB = "CLOB";
+    private static final String DM_TEXT = "TEXT";
+    private static final String DM_LONG = "LONG";
+
+    // ------------------------------time-------------------------
+    private static final String DM_DATE = "DATE";
+    private static final String DM_TIME = "TIME";
+    private static final String DM_TIMESTAMP = "TIMESTAMP";
+
+    // ---------------------------binary---------------------------
+    private static final String DM_BINARY = "BINARY";
+    private static final String DM_VARBINARY = "VARBINARY";
+
+    // -------------------------time interval-----------------------
+    private static final String DM_INTERVAL_YEAR_TO_MONTH = "INTERVAL YEAR TO MONTH";
+    private static final String DM_INTERVAL_YEAR = "INTERVAL YEAR";
+    private static final String DM_INTERVAL_MONTH = "INTERVAL MONTH";
+    private static final String DM_INTERVAL_DAY = "INTERVAL DAY";
+    private static final String DM_INTERVAL_DAY_TO_HOUR = "INTERVAL DAY TO HOUR";
+    private static final String DM_INTERVAL_DAY_TO_MINUTE = "INTERVAL DAY TO MINUTE";
+    private static final String DM_INTERVAL_DAY_TO_SECOND = "INTERVAL DAY TO SECOND";
+    private static final String DM_INTERVAL_HOUR = "INTERVAL HOUR";
+    private static final String DM_INTERVAL_HOUR_TO_MINUTE = "INTERVAL HOUR TO MINUTE";
+    private static final String DM_INTERVAL_HOUR_TO_SECOND = "INTERVAL HOUR TO SECOND";
+    private static final String DM_INTERVAL_MINUTE = "INTERVAL MINUTE";
+    private static final String DM_INTERVAL_MINUTE_TO_SECOND = "INTERVAL MINUTE TO SECOND";
+    private static final String DM_INTERVAL_SECOND = "INTERVAL SECOND";
+    // time zone
+    private static final String DM_TIME_WITH_TIME_ZONE = "TIME WITH TIME ZONE";
+    private static final String DM_TIMESTAMP_WITH_TIME_ZONE = "TIMESTAMP WITH TIME ZONE";
+    private static final String TIMESTAMP_WITH_LOCAL_TIME_ZONE = "TIMESTAMP WITH LOCAL TIME ZONE";
+
+    // ------------------------------blob-------------------------
+    public static final String DM_BLOB = "BLOB";
+    public static final String DM_BFILE = "BFILE";
+    public static final String DM_IMAGE = "IMAGE";
+    public static final String DM_LONGVARBINARY = "LONGVARBINARY";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
+        String dmdbType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        //  String columnName = metadata.getColumnName(colIndex);

Review Comment:
   Delete it if you don't need it



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbJdbcRowConverter.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class DmdbJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return "Dmdb";
+    }
+
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
+        return super.toInternal(rs, metaData, typeInfo);

Review Comment:
   Is it OK to use the default `toInternal`?, it is recommended to test the covered data types locally



##########
pom.xml:
##########
@@ -352,6 +353,13 @@
                 <scope>test</scope>
             </dependency>
 
+            <dependency>
+                <groupId>com.dameng</groupId>
+                <artifactId>DmJdbcDriver18</artifactId>
+                <version>${dm-jdbc.version}</version>
+                <scope>provided</scope>

Review Comment:
   this `scope` need to be `test`. We don't want to include DmJdbcDriver18.jar in the connector's package



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1207190820

   I imitate MySqlCatalog to DMCatalog, but I don't know it work.
   and the workflows is failed, I need some time to solve it,and I try to learn know how to do it rencently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1237013590

   > Please fixed checkstyle problem and conflct
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on a diff in pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on code in PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#discussion_r965150334


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
+        server = new GenericContainer<>(DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        server.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", 5236, 5236)));
+        Startables.deepStart(Stream.of(server)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start
+        Class.forName(DRIVER_CLASS);
+        given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }
+
+    @Test
+    @DisplayName("flink JDBC-DM test")
+    public void testJdbcDmdbSourceAndSink() throws IOException, InterruptedException, SQLException {
+        assertHasData(SOURCE_TABLE);
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_dm_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        assertHasData(SINK_TABLE);

Review Comment:
   I Ihave debugged to check. Verifying the data consistency between sink and source is a public Issue.. I hope to discuss with you in another PR, such as how to preset data and assert verification. I think this is required for all E2E use cases. 
   Currently, only DM has preset SQL for all fields, and other E2E has only a few fields



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1207688593

   > If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   
   ok,I will lately


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
ic4y commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1208840586

   > > > I imitate MySqlCatalog to DMCatalog, but I don't know it work. and the workflows is failed, I need some time to solve it,and I try to learn know how to do it rencently.
   > > 
   > > 
   > > Ask any questions at any time.
   > 
   > TypeMapper is in the method org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog#fromJdbcType
   > 
   > but also in this method org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper
   > 
   > Any difference?
   
   There is really no difference between the two, This is indeed repeated in the implementation of the `catalog`. However, the catalog has not been started yet, so it is not necessary to implement the `catalog` related things.
   
   Refer to the implementation under the `jdbc.internal.dialect` package.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#issuecomment-1236562277

   Please fixed checkstyle problem and conflct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] laglangyue commented on a diff in pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
laglangyue commented on code in PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#discussion_r963682067


##########
pom.xml:
##########
@@ -220,7 +221,11 @@
                 <artifactId>postgresql</artifactId>
                 <version>${postgresql.version}</version>
             </dependency>
-
+            <dependency>

Review Comment:
   > Hi, please move connector dependency to connector's pom.xml. Reference: #2630
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#discussion_r964555824


##########
seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml:
##########
@@ -76,6 +76,11 @@
             <artifactId>connector-dingtalk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   revert?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml:
##########
@@ -55,6 +55,7 @@
                 </exclusion>
             </exclusions>
         </dependency>
+

Review Comment:
   revert?



##########
seatunnel-examples/seatunnel-spark-connector-v2-example/pom.xml:
##########
@@ -55,6 +55,11 @@
             <artifactId>connector-clickhouse</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>

Review Comment:
   revert?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JDBCDmdbIT.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JDBCDmdbIT extends SparkContainer {
+
+    private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "spark_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private GenericContainer<?> dmServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void beforeAllForDM() {
+        try {
+            dmServer = new GenericContainer<>(DM_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+            dmServer.setPortBindings(Lists.newArrayList("5236:5236"));
+            Startables.deepStart(Stream.of(dmServer)).join();
+            log.info("dm container started");
+            Class.forName(DRIVER_CLASS);
+            given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+            initializeJdbcTable();
+        } catch (Exception ex) {
+            log.error("dm container init failed", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {

Review Comment:
   Greenplum?



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
+        server = new GenericContainer<>(DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        server.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", 5236, 5236)));
+        Startables.deepStart(Stream.of(server)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start
+        Class.forName(DRIVER_CLASS);
+        given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }

Review Comment:
   add close server
   
   ```java
   if (server != null) {
       server.stop();
   }
   ```



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
+        server = new GenericContainer<>(DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        server.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", 5236, 5236)));
+        Startables.deepStart(Stream.of(server)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start

Review Comment:
   Greenplum?



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
+        server = new GenericContainer<>(DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        server.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", 5236, 5236)));
+        Startables.deepStart(Stream.of(server)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start
+        Class.forName(DRIVER_CLASS);
+        given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {

Review Comment:
   Greenplum?



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {

Review Comment:
   Greenplum?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JDBCDmdbIT.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JDBCDmdbIT extends SparkContainer {
+
+    private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "spark_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private GenericContainer<?> dmServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void beforeAllForDM() {
+        try {
+            dmServer = new GenericContainer<>(DM_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+            dmServer.setPortBindings(Lists.newArrayList("5236:5236"));
+            Startables.deepStart(Stream.of(dmServer)).join();
+            log.info("dm container started");
+            Class.forName(DRIVER_CLASS);
+            given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+            initializeJdbcTable();
+        } catch (Exception ex) {
+            log.error("dm container init failed", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        URL resource = JDBCDmdbIT.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }

Review Comment:
   remove? 
    Only need to check sink data?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JDBCDmdbIT.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JDBCDmdbIT extends SparkContainer {
+
+    private static final String DM_DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "spark_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private GenericContainer<?> dmServer;
+    private Connection jdbcConnection;
+
+    @BeforeEach
+    public void beforeAllForDM() {
+        try {
+            dmServer = new GenericContainer<>(DM_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+            dmServer.setPortBindings(Lists.newArrayList("5236:5236"));
+            Startables.deepStart(Stream.of(dmServer)).join();
+            log.info("dm container started");
+            Class.forName(DRIVER_CLASS);
+            given().ignoreExceptions()
+                .await()
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+            initializeJdbcTable();
+        } catch (Exception ex) {
+            log.error("dm container init failed", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }

Review Comment:
   add close server
   
   ```java
   if (dmServer != null) {
       dmServer.stop();
   }
   ```



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcDmdbIT extends FlinkContainer {
+
+    private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
+    private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
+    private static final String HOST = "flink_e2e_dmdb";
+    private static final String LOCAL_HOST = "localhost";
+    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+
+    private static final String USERNAME = "SYSDBA";
+    private static final String PASSWORD = "SYSDBA";
+    private static final String DATABASE = "SYSDBA";
+    private static final String SOURCE_TABLE = "e2e_table_source";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private Connection jdbcConnection;
+    private GenericContainer<?> server;
+
+    @BeforeEach
+    public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
+        server = new GenericContainer<>(DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
+        server.setPortBindings(Lists.newArrayList(
+            String.format("%s:%s", 5236, 5236)));
+        Startables.deepStart(Stream.of(server)).join();
+        log.info("Greenplum container started");
+        // wait for Greenplum fully start
+        Class.forName(DRIVER_CLASS);
+        given().ignoreExceptions()
+            .await()
+            .atMost(180, TimeUnit.SECONDS)
+            .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    private void initializeJdbcConnection() throws SQLException {
+        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+    }
+
+    /**
+     * init the table for DM_SERVER, DDL and DML for source and sink
+     */
+    private void initializeJdbcTable() {
+        java.net.URL resource = FlinkContainer.class.getResource("/jdbc/init_sql/dm_init.conf");
+        if (resource == null) {
+            throw new IllegalArgumentException("can't find find file");
+        }
+        String file = resource.getFile();
+        Config config = ConfigFactory.parseFile(new File(file));
+        assert config.hasPath("dm_table_source") && config.hasPath("DML") && config.hasPath("dm_table_sink");
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // source
+            String sourceTableDDL = config.getString("dm_table_source");
+            statement.execute(sourceTableDDL);
+            String insertSQL = config.getString("DML");
+            statement.execute(insertSQL);
+            // sink
+            String sinkTableDDL = config.getString("dm_table_sink");
+            statement.execute(sinkTableDDL);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+
+    private void assertHasData(String table) {
+        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
+            Statement statement = connection.createStatement();
+            String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
+            ResultSet source = statement.executeQuery(sql);
+            Assertions.assertTrue(source.next());
+        } catch (SQLException e) {
+            throw new RuntimeException("test dm server image error", e);
+        }
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+    }
+
+    @Test
+    @DisplayName("JDBC-DM container can be pull")
+    public void testDMDBImage() {
+        assertHasData(SOURCE_TABLE);
+    }

Review Comment:
   remove?
   Only need to check sink data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #2377: [Connector-V2][JDBC-connector] support Jdbc dm

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #2377:
URL: https://github.com/apache/incubator-seatunnel/pull/2377#discussion_r963212345


##########
pom.xml:
##########
@@ -220,7 +221,11 @@
                 <artifactId>postgresql</artifactId>
                 <version>${postgresql.version}</version>
             </dependency>
-
+            <dependency>

Review Comment:
   Hi, please move connector dependency to connector's pom.xml. Reference: #2630 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org