You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/13 09:38:11 UTC

[GitHub] [incubator-seatunnel] nutsjian opened a new pull request, #3089: [Feature][Connector-V2][JDBC] support sqlite

nutsjian opened a new pull request, #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   [JDBC] SqLite Connector
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 merged pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
EricJoy2048 merged PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r997686400


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    @Test

Review Comment:
   Remove `@Test`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r995491414


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {

Review Comment:
   Please check the method name.



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        jobManager.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        Statement statement = jdbcConnection.createStatement();
+        ResultSet resultSet = statement.executeQuery(config.getString("check_type_sink_table_sql"));
+        resultSet.next();
+        Assertions.assertEquals(resultSet.getInt(1), 2);
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSink() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+        jobManager.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<List> result = new ArrayList<>();
+        try (Statement statement = jdbcConnection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(TEST_DATASET, result);
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException, IOException {

Review Comment:
   Please check the method name.



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {

Review Comment:
   Please check the method name.



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        Statement statement = jdbcConnection.createStatement();
+        ResultSet resultSet = statement.executeQuery(config.getString("check_type_sink_table_sql"));
+        resultSet.next();
+        Assertions.assertEquals(resultSet.getInt(1), 2);
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSink() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<List> result = new ArrayList<>();
+        try (Statement statement = jdbcConnection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(TEST_DATASET, result);
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException, IOException {

Review Comment:
   Please check the method name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1285686817

   @EricJoy2048 Please help me run workflows again when you're not busy, thx. I have looked at the failed log, it seems that the doc has not been merged before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r994675117


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    Jdbc {
+        driver = org.sqlite.JDBC
+        url = "jdbc:sqlite:/tmp/seatunnel/data/test.db"
+        user = ""
+        password = ""
+        type_affinity = true
+        query = "select age, name from source"

Review Comment:
   test all supported datatypes?
   
   reference
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_oracle_source_to_sink.conf#L36
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1306697649

   @EricJoy2048 please help me to rerun the CI workflows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r994596365


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+
+    private static final String SQLITE_USER = "";
+    private static final String SQLITE_PASSWORD = "";
+    private static final String SQLITE_DRIVER = "org.sqlite.JDBC";
+    private String tmpdir;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws ClassNotFoundException, SQLException {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName(SQLITE_DRIVER);
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", SQLITE_USER, SQLITE_PASSWORD);
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws SQLException {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS source");
+            statement.execute("DROP TABLE IF EXISTS sink");
+            String createSource = "CREATE TABLE source (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";
+            String createSink = "CREATE TABLE sink (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";

Review Comment:
   Roger that. I'm fixing it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1314757326

   @Hisoka-X @EricJoy2048 conflict resolved, please help me rerun the CI workflows, thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1311137135

   > Please add e2e in flink/spark e2e module. So that e2e can be start in CI. <img alt="image" width="1000" src="https://user-images.githubusercontent.com/32387433/200732205-1f4984a8-f704-41e2-bb1e-b22d9a663360.png">
   
   thx man, done. I have merged upstream dev, and add flink/spark connector-elasticsearch-e2e module. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r994706628


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    Jdbc {
+        driver = org.sqlite.JDBC
+        url = "jdbc:sqlite:/tmp/seatunnel/data/test.db"
+        user = ""
+        password = ""
+        type_affinity = true
+        query = "select age, name from source"

Review Comment:
   Thanks man. I'm fixing it. SQLite uses a more general dynamic type system and type affinity (https://www.sqlite.org/datatype3.html). So I consider test all supported datatypes refer to the JDBC mysql e2e test, workable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1285253565

   Please add `Source` or `Sink` or `Source & Sink` in the pr title.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1308161354

   Please add e2e in flink/spark e2e module. So that e2e can be start in CI.
   <img width="1000" alt="image" src="https://user-images.githubusercontent.com/32387433/200732205-1f4984a8-f704-41e2-bb1e-b22d9a663360.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r995719643


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";

Review Comment:
   Can you test all data types?
   
   



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();

Review Comment:
   JdbcMysqlIT?



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -135,6 +137,14 @@ private SeaTunnelRowType initTableField(Connection conn) {
             }
         } catch (Exception e) {
             LOG.warn("get row type info exception", e);
+        } finally {
+            if (Objects.nonNull(resultSet)) {
+                try {

Review Comment:
   What is the role of this code?



##########
docs/en/connector-v2/sink/Jdbc.md:
##########
@@ -115,6 +115,7 @@ there are some reference value for params above.
 | phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
 | sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
 | oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| sqlite     | org.sqlite.JDBC                              | Jdbc:sqlite:test.db                                          | /                                                  | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |

Review Comment:
   Driver right?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();

Review Comment:
   JdbcMysqlIT?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();

Review Comment:
   JdbcMysqlIT?



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        jobManager.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();

Review Comment:
   JdbcMysqlIT?



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class SqliteTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SqliteTypeMapper.class);

Review Comment:
   Useless code



##########
seatunnel-connectors-v2/connector-jdbc/pom.xml:
##########
@@ -130,9 +142,17 @@
             <artifactId>ojdbc8</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.xerial</groupId>
+            <artifactId>sqlite-jdbc</artifactId>
+         </dependency>
+         <dependency>
             <groupId>com.ibm.db2.jcc</groupId>
             <artifactId>db2jcc</artifactId>
         </dependency>
+        <dependency>

Review Comment:
   duplicated code



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConfig.java:
##########
@@ -95,6 +94,7 @@ public static JdbcConnectionOptions buildJdbcConnectionOptions(Config config) {
                 jdbcOptions.transactionTimeoutSec = config.getInt(JdbcConfig.TRANSACTION_TIMEOUT_SEC);
             }
         }
+

Review Comment:
   Please revert it.



##########
seatunnel-connectors-v2/connector-jdbc/pom.xml:
##########
@@ -78,20 +80,30 @@
                 <version>${oracle.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.xerial</groupId>
+                <artifactId>sqlite-jdbc</artifactId>
+                <version>${sqlite.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <dependency>
                 <groupId>com.ibm.db2.jcc</groupId>
                 <artifactId>db2jcc</artifactId>
                 <version>${db2.version}</version>
                 <scope>provided</scope>
             </dependency>
-
+            <dependency>
+                <groupId>org.xerial</groupId>
+                <artifactId>sqlite-jdbc</artifactId>
+                <version>${sqlite.version}</version>
+                <scope>provided</scope>

Review Comment:
   duplicated code.



##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml:
##########
@@ -36,6 +36,7 @@
         <module>connector-iceberg-flink-e2e</module>
         <module>connector-neo4j-flink-e2e</module>
         <module>connector-kafka-flink-e2e</module>
+        <module>connector-elasticsearch-flink-e2e</module>

Review Comment:
   Does this code belong to this PR?



##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml:
##########
@@ -36,6 +36,7 @@
         <module>connector-neo4j-spark-e2e</module>
         <module>connector-kafka-spark-e2e</module>
         <module>connector-iceberg-spark-e2e</module>
+        <module>connector-elasticsearch-spark-e2e</module>

Review Comment:
   Does this code belong to this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1024681661


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml:
##########
@@ -36,6 +36,7 @@
         <module>connector-iceberg-flink-e2e</module>
         <module>connector-neo4j-flink-e2e</module>
         <module>connector-kafka-flink-e2e</module>
+        <module>connector-elasticsearch-flink-e2e</module>

Review Comment:
   > > Please add e2e in flink/spark e2e module. So that e2e can be start in CI. <img alt="image" width="1000" src="https://user-images.githubusercontent.com/32387433/200732205-1f4984a8-f704-41e2-bb1e-b22d9a663360.png">
   > 
   > thx man, done. I have merged upstream dev, and add flink/spark connector-elasticsearch-e2e module.
   
   
   this code is for this comment. I merge upstream dev, and add elasticsearch-flink-e2e in pom.xml. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 closed pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
EricJoy2048 closed pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink
URL: https://github.com/apache/incubator-seatunnel/pull/3089


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r995526518


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        Statement statement = jdbcConnection.createStatement();
+        ResultSet resultSet = statement.executeQuery(config.getString("check_type_sink_table_sql"));
+        resultSet.next();
+        Assertions.assertEquals(resultSet.getInt(1), 2);
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSink() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<List> result = new ArrayList<>();
+        try (Statement statement = jdbcConnection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(TEST_DATASET, result);
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException, IOException {

Review Comment:
   thx, resolved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1024680270


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();

Review Comment:
   I've been fixed and commit before, is there any problem with my git operation steps, request for comments. 



##########
docs/en/connector-v2/sink/Jdbc.md:
##########
@@ -115,6 +115,7 @@ there are some reference value for params above.
 | phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
 | sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
 | oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| sqlite     | org.sqlite.JDBC                              | Jdbc:sqlite:test.db                                          | /                                                  | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1314597596

   @EricJoy2048 please help me rerun ci workflows, if the unit test error has been resolved. thx.
   
   ```bash
   2022-11-11T02:48:26.4091671Z [ERROR] Failed to execute goal on project connector-jdbc-spark-e2e: Could not resolve dependencies for project org.apache.seatunnel:connector-jdbc-spark-e2e:jar:2.1.3-SNAPSHOT: Failed to collect dependencies at org.testcontainers:postgresql:jar:1.17.3: Failed to read artifact descriptor for org.testcontainers:postgresql:jar:1.17.3: Could not transfer artifact org.testcontainers:postgresql:pom:1.17.3 from/to central (https://repo.maven.apache.org/maven2): transfer failed for https://repo.maven.apache.org/maven2/org/testcontainers/postgresql/1.17.3/postgresql-1.17.3.pom: Connection reset -> [Help 1]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1314679839

   Please resolve conflict, Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r995526518


##########
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends SparkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws Exception {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName("org.sqlite.JDBC");
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", "", "");
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        try {
+            Statement statement = jdbcConnection.createStatement();
+            statement.execute(config.getString("source_table"));
+            statement.execute(config.getString("sink_table"));
+            statement.execute(config.getString("type_source_table"));
+            statement.execute(config.getString("type_sink_table"));
+            statement.execute(config.getString("insert_type_source_table_sql"));
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing Sqlite table failed!", e);
+        }
+    }
+
+    private void batchInsertData() throws SQLException {
+        String sql = "insert into source(age, name) values(?, ?)";
+
+        try {
+            jdbcConnection.setAutoCommit(false);
+            try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(sql)) {
+                for (List row : TEST_DATASET) {
+                    preparedStatement.setInt(1, (Integer) row.get(0));
+                    preparedStatement.setString(2, (String) row.get(1));
+                    preparedStatement.addBatch();
+                }
+                preparedStatement.executeBatch();
+            }
+            jdbcConnection.commit();
+        } catch (SQLException e) {
+            jdbcConnection.rollback();
+            throw e;
+        }
+    }
+
+    private static List<List<Object>> generateTestDataset() {
+        List<List<Object>> rows = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            rows.add(Arrays.asList(i, String.format("test_%s", i)));
+        }
+        return rows;
+    }
+
+    @Test
+    public void testJdbcMysqlSourceAndSinkDataType() throws Exception {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink_datatype.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        checkSinkDataTypeTable();
+    }
+
+    private void checkSinkDataTypeTable() throws Exception {
+        URI resource = Objects.requireNonNull(JdbcMysqlIT.class.getResource("/jdbc/init_sql/sqlite_init.conf")).toURI();
+        config = new ConfigBuilder(Paths.get(resource)).getConfig();
+        CheckConfigUtil.checkAllExists(this.config, "source_table", "sink_table", "type_source_table",
+                "type_sink_table", "insert_type_source_table_sql", "check_type_sink_table_sql");
+
+        Statement statement = jdbcConnection.createStatement();
+        ResultSet resultSet = statement.executeQuery(config.getString("check_type_sink_table_sql"));
+        resultSet.next();
+        Assertions.assertEquals(resultSet.getInt(1), 2);
+    }
+
+    @Test
+    public void testJdbcSqliteSourceAndSink() throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlite_source_and_sink.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+        master.copyFileFromContainer(Paths.get(SEATUNNEL_HOME, "data", "test.db").toString(), new File(tmpdir + "test.db").toPath().toString());
+        // query result
+        String sql = "select age, name from sink order by age asc";
+        List<List> result = new ArrayList<>();
+        try (Statement statement = jdbcConnection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery(sql);
+            while (resultSet.next()) {
+                result.add(Arrays.asList(
+                        resultSet.getInt(1),
+                        resultSet.getString(2)));
+            }
+        }
+        Assertions.assertIterableEquals(TEST_DATASET, result);
+    }
+
+    @AfterEach
+    public void closeGreenplumContainer() throws SQLException, IOException {

Review Comment:
   thx, sloved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1320824299

   Please fix ci problem. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r994420396


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+
+    private static final String SQLITE_USER = "";
+    private static final String SQLITE_PASSWORD = "";
+    private static final String SQLITE_DRIVER = "org.sqlite.JDBC";
+    private String tmpdir;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    private void initTestDb() throws ClassNotFoundException, SQLException {
+        tmpdir = System.getProperty("java.io.tmpdir");
+        Class.forName(SQLITE_DRIVER);
+        jdbcConnection = DriverManager.getConnection("jdbc:sqlite:" + tmpdir + "test.db", SQLITE_USER, SQLITE_PASSWORD);
+        initializeJdbcTable();
+        batchInsertData();
+    }
+
+    private void initializeJdbcTable() throws SQLException {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute("DROP TABLE IF EXISTS source");
+            statement.execute("DROP TABLE IF EXISTS sink");
+            String createSource = "CREATE TABLE source (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";
+            String createSink = "CREATE TABLE sink (\n" +
+                    "age INT NOT NULL,\n" +
+                    "name VARCHAR(255) NOT NULL\n" +
+                    ")";

Review Comment:
   Please test all field types



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1004090792


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class SqliteTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SqliteTypeMapper.class);
+
+    // ============================data types=====================
+
+    private static final String SQLITE_UNKNOWN = "UNKNOWN";
+    private static final String SQLITE_BIT = "BIT";
+    private static final String SQLITE_BOOLEAN = "BOOLEAN";
+
+    // -------------------------integer----------------------------
+    private static final String SQLITE_TINYINT = "TINYINT";
+    private static final String SQLITE_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String SQLITE_SMALLINT = "SMALLINT";
+    private static final String SQLITE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String SQLITE_MEDIUMINT = "MEDIUMINT";
+    private static final String SQLITE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String SQLITE_INT = "INT";
+    private static final String SQLITE_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String SQLITE_INTEGER = "INTEGER";
+    private static final String SQLITE_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String SQLITE_BIGINT = "BIGINT";
+    private static final String SQLITE_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String SQLITE_DECIMAL = "DECIMAL";
+    private static final String SQLITE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String SQLITE_FLOAT = "FLOAT";
+    private static final String SQLITE_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String SQLITE_DOUBLE = "DOUBLE";
+    private static final String SQLITE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String SQLITE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    private static final String SQLITE_NUMERIC = "NUMERIC";
+    private static final String SQLITE_REAL = "REAL";
+
+    // -------------------------text----------------------------
+    private static final String SQLITE_CHAR = "CHAR";
+    private static final String SQLITE_CHARACTER = "CHARACTER";
+    private static final String SQLITE_VARYING_CHARACTER = "VARYING_CHARACTER";
+    private static final String SQLITE_NATIVE_CHARACTER = "NATIVE_CHARACTER";
+    private static final String SQLITE_NCHAR = "NCHAR";
+    private static final String SQLITE_VARCHAR = "VARCHAR";
+    private static final String SQLITE_LONGVARCHAR = "LONGVARCHAR";
+    private static final String SQLITE_LONGNVARCHAR = "LONGNVARCHAR";
+    private static final String SQLITE_NVARCHAR = "NVARCHAR";
+    private static final String SQLITE_TINYTEXT = "TINYTEXT";
+    private static final String SQLITE_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String SQLITE_TEXT = "TEXT";
+    private static final String SQLITE_LONGTEXT = "LONGTEXT";
+    private static final String SQLITE_JSON = "JSON";
+    private static final String SQLITE_CLOB = "CLOB";
+
+    // ------------------------------time(text)-------------------------
+    private static final String SQLITE_DATE = "DATE";
+    private static final String SQLITE_DATETIME = "DATETIME";
+    private static final String SQLITE_TIME = "TIME";
+    private static final String SQLITE_TIMESTAMP = "TIMESTAMP";
+
+    // ------------------------------blob-------------------------
+    private static final String SQLITE_TINYBLOB = "TINYBLOB";
+    private static final String SQLITE_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String SQLITE_BLOB = "BLOB";
+    private static final String SQLITE_LONGBLOB = "LONGBLOB";
+    private static final String SQLITE_BINARY = "BINARY";
+    private static final String SQLITE_VARBINARY = "VARBINARY";
+    private static final String SQLITE_LONGVARBINARY = "LONGVARBINARY";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
+        String columnTypeName = metadata.getColumnTypeName(colIndex).toUpperCase().trim();
+        switch (columnTypeName) {
+            case SQLITE_BIT:
+            case SQLITE_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case SQLITE_TINYINT:
+            case SQLITE_TINYINT_UNSIGNED:
+            case SQLITE_SMALLINT:

Review Comment:
   thx, I'll test it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1003052245


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteTypeMapper.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class SqliteTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SqliteTypeMapper.class);
+
+    // ============================data types=====================
+
+    private static final String SQLITE_UNKNOWN = "UNKNOWN";
+    private static final String SQLITE_BIT = "BIT";
+    private static final String SQLITE_BOOLEAN = "BOOLEAN";
+
+    // -------------------------integer----------------------------
+    private static final String SQLITE_TINYINT = "TINYINT";
+    private static final String SQLITE_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String SQLITE_SMALLINT = "SMALLINT";
+    private static final String SQLITE_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String SQLITE_MEDIUMINT = "MEDIUMINT";
+    private static final String SQLITE_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String SQLITE_INT = "INT";
+    private static final String SQLITE_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String SQLITE_INTEGER = "INTEGER";
+    private static final String SQLITE_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String SQLITE_BIGINT = "BIGINT";
+    private static final String SQLITE_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String SQLITE_DECIMAL = "DECIMAL";
+    private static final String SQLITE_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String SQLITE_FLOAT = "FLOAT";
+    private static final String SQLITE_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String SQLITE_DOUBLE = "DOUBLE";
+    private static final String SQLITE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String SQLITE_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+    private static final String SQLITE_NUMERIC = "NUMERIC";
+    private static final String SQLITE_REAL = "REAL";
+
+    // -------------------------text----------------------------
+    private static final String SQLITE_CHAR = "CHAR";
+    private static final String SQLITE_CHARACTER = "CHARACTER";
+    private static final String SQLITE_VARYING_CHARACTER = "VARYING_CHARACTER";
+    private static final String SQLITE_NATIVE_CHARACTER = "NATIVE_CHARACTER";
+    private static final String SQLITE_NCHAR = "NCHAR";
+    private static final String SQLITE_VARCHAR = "VARCHAR";
+    private static final String SQLITE_LONGVARCHAR = "LONGVARCHAR";
+    private static final String SQLITE_LONGNVARCHAR = "LONGNVARCHAR";
+    private static final String SQLITE_NVARCHAR = "NVARCHAR";
+    private static final String SQLITE_TINYTEXT = "TINYTEXT";
+    private static final String SQLITE_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String SQLITE_TEXT = "TEXT";
+    private static final String SQLITE_LONGTEXT = "LONGTEXT";
+    private static final String SQLITE_JSON = "JSON";
+    private static final String SQLITE_CLOB = "CLOB";
+
+    // ------------------------------time(text)-------------------------
+    private static final String SQLITE_DATE = "DATE";
+    private static final String SQLITE_DATETIME = "DATETIME";
+    private static final String SQLITE_TIME = "TIME";
+    private static final String SQLITE_TIMESTAMP = "TIMESTAMP";
+
+    // ------------------------------blob-------------------------
+    private static final String SQLITE_TINYBLOB = "TINYBLOB";
+    private static final String SQLITE_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String SQLITE_BLOB = "BLOB";
+    private static final String SQLITE_LONGBLOB = "LONGBLOB";
+    private static final String SQLITE_BINARY = "BINARY";
+    private static final String SQLITE_VARBINARY = "VARBINARY";
+    private static final String SQLITE_LONGVARBINARY = "LONGVARBINARY";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
+        String columnTypeName = metadata.getColumnTypeName(colIndex).toUpperCase().trim();
+        switch (columnTypeName) {
+            case SQLITE_BIT:
+            case SQLITE_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case SQLITE_TINYINT:
+            case SQLITE_TINYINT_UNSIGNED:
+            case SQLITE_SMALLINT:

Review Comment:
   We also support short type, you can use it to map smallint and tinyint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1008707638


##########
docs/en/connector-v2/source/Jdbc.md:
##########
@@ -90,17 +90,18 @@ in parallel according to the concurrency of tasks.
 
 there are some reference value for params above.
 
-| datasource | driver                                       | url                                                          | maven                                                        |
-| ---------- | -------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
-| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
-| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                    | https://mvnrepository.com/artifact/org.postgresql/postgresql |
-| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                     | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
-| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
-| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
-| oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
-| sqlite     | org.sqlite.JDBC                              | jdbc:sqlite:test.db                                          | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |
-| gbase8a    | com.gbase.jdbc.Driver                        | jdbc:gbase://e2e_gbase8aDb:5258/test                         | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
-| starrocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| datasource | driver                                       | url                                                                | maven                                                                                                       |
+|------------|----------------------------------------------|--------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                   | https://mvnrepository.com/artifact/mysql/mysql-connector-java                                               |
+| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                          | https://mvnrepository.com/artifact/org.postgresql/postgresql                                                |
+| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                           | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                                                |
+| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client                        |
+| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                          | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                                       |
+| oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                            | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8                                          |
+| sqlite     | org.sqlite.JDBC                              | jdbc:sqlite:test.db                                                | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |

Review Comment:
   same as above.



##########
docs/en/connector-v2/sink/Jdbc.md:
##########
@@ -108,17 +108,18 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires
 
 there are some reference value for params above.
 
-| datasource | driver                                       | url                                                          | xa_data_source_class_name                          | maven                                                        |
-|------------| -------------------------------------------- | ------------------------------------------------------------ | -------------------------------------------------- | ------------------------------------------------------------ |
-| MySQL      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | com.mysql.cj.jdbc.MysqlXADataSource                | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
-| PostgreSQL | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                    | org.postgresql.xa.PGXADataSource                   | https://mvnrepository.com/artifact/org.postgresql/postgresql |
-| DM         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                     | dm.jdbc.driver.DmdbXADataSource                    | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
-| Phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
-| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
-| Oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
-| sqlite     | org.sqlite.JDBC                              | Jdbc:sqlite:test.db                                          | /                                                  | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |
-| GBase8a    | com.gbase.jdbc.Driver                        | jdbc:gbase://e2e_gbase8aDb:5258/test                         | /                                                  | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
-| StarRocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | /                                                  | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| datasource | driver                                       | url                                                                | xa_data_source_class_name                          | maven                                                                                                       |
+|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| MySQL      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                   | com.mysql.cj.jdbc.MysqlXADataSource                | https://mvnrepository.com/artifact/mysql/mysql-connector-java                                               |
+| PostgreSQL | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                          | org.postgresql.xa.PGXADataSource                   | https://mvnrepository.com/artifact/org.postgresql/postgresql                                                |
+| DM         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                           | dm.jdbc.driver.DmdbXADataSource                    | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                                                |
+| Phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client                        |
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                          | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                                       |
+| Oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                            | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8                                          |
+| sqlite     | org.sqlite.JDBC                              | Jdbc:sqlite:test.db                                                | /                                                  | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |

Review Comment:
   Please add `changed log` reference https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/sink/SftpFile.md



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1010185587


##########
docs/en/connector-v2/source/Jdbc.md:
##########
@@ -90,17 +90,18 @@ in parallel according to the concurrency of tasks.
 
 there are some reference value for params above.
 
-| datasource | driver                                       | url                                                          | maven                                                        |
-| ---------- | -------------------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
-| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
-| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                    | https://mvnrepository.com/artifact/org.postgresql/postgresql |
-| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                     | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
-| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
-| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
-| oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
-| sqlite     | org.sqlite.JDBC                              | jdbc:sqlite:test.db                                          | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |
-| gbase8a    | com.gbase.jdbc.Driver                        | jdbc:gbase://e2e_gbase8aDb:5258/test                         | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
-| starrocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| datasource | driver                                       | url                                                                | maven                                                                                                       |
+|------------|----------------------------------------------|--------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| mysql      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                   | https://mvnrepository.com/artifact/mysql/mysql-connector-java                                               |
+| postgresql | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                          | https://mvnrepository.com/artifact/org.postgresql/postgresql                                                |
+| dm         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                           | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                                                |
+| phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client                        |
+| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                          | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                                       |
+| oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                            | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8                                          |
+| sqlite     | org.sqlite.JDBC                              | jdbc:sqlite:test.db                                                | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |

Review Comment:
   done.



##########
docs/en/connector-v2/sink/Jdbc.md:
##########
@@ -108,17 +108,18 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires
 
 there are some reference value for params above.
 
-| datasource | driver                                       | url                                                          | xa_data_source_class_name                          | maven                                                        |
-|------------| -------------------------------------------- | ------------------------------------------------------------ | -------------------------------------------------- | ------------------------------------------------------------ |
-| MySQL      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | com.mysql.cj.jdbc.MysqlXADataSource                | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
-| PostgreSQL | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                    | org.postgresql.xa.PGXADataSource                   | https://mvnrepository.com/artifact/org.postgresql/postgresql |
-| DM         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                     | dm.jdbc.driver.DmdbXADataSource                    | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 |
-| Phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
-| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                    | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
-| Oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                      | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
-| sqlite     | org.sqlite.JDBC                              | Jdbc:sqlite:test.db                                          | /                                                  | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |
-| GBase8a    | com.gbase.jdbc.Driver                        | jdbc:gbase://e2e_gbase8aDb:5258/test                         | /                                                  | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
-| StarRocks  | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                             | /                                                  | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
+| datasource | driver                                       | url                                                                | xa_data_source_class_name                          | maven                                                                                                       |
+|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| MySQL      | com.mysql.cj.jdbc.Driver                     | jdbc:mysql://localhost:3306/test                                   | com.mysql.cj.jdbc.MysqlXADataSource                | https://mvnrepository.com/artifact/mysql/mysql-connector-java                                               |
+| PostgreSQL | org.postgresql.Driver                        | jdbc:postgresql://localhost:5432/postgres                          | org.postgresql.xa.PGXADataSource                   | https://mvnrepository.com/artifact/org.postgresql/postgresql                                                |
+| DM         | dm.jdbc.driver.DmDriver                      | jdbc:dm://localhost:5236                                           | dm.jdbc.driver.DmdbXADataSource                    | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                                                |
+| Phoenix    | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /                                                  | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client                        |
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433                          | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc                                       |
+| Oracle     | oracle.jdbc.OracleDriver                     | jdbc:oracle:thin:@localhost:1521/xepdb1                            | oracle.jdbc.xa.OracleXADataSource                  | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8                                          |
+| sqlite     | org.sqlite.JDBC                              | Jdbc:sqlite:test.db                                                | /                                                  | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc    |

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] liugddx commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
liugddx commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1022345187


##########
seatunnel-connectors-v2/connector-jdbc/pom.xml:
##########
@@ -130,9 +142,17 @@
             <artifactId>ojdbc8</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.xerial</groupId>
+            <artifactId>sqlite-jdbc</artifactId>
+         </dependency>
+         <dependency>
             <groupId>com.ibm.db2.jcc</groupId>
             <artifactId>db2jcc</artifactId>
         </dependency>
+        <dependency>

Review Comment:
   duplicated code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
nutsjian commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1279764050

   @EricJoy2048 thx, man. I fixed the e2e test. Please approve running workflows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r994742560


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlite_source_and_sink.conf:
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    Jdbc {
+        driver = org.sqlite.JDBC
+        url = "jdbc:sqlite:/tmp/seatunnel/data/test.db"
+        user = ""
+        password = ""
+        type_affinity = true
+        query = "select age, name from source"

Review Comment:
   good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r997691805


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqliteIT.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class JdbcSqliteIT extends FlinkContainer {
+    private String tmpdir;
+    private Config config;
+    private static final List<List<Object>> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.39.3.0/sqlite-jdbc-3.39.3.0.jar";
+
+    private Connection jdbcConnection;
+
+    @Test

Review Comment:
   thx, I'll fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#issuecomment-1311137965

   @EricJoy2048 I have add flink/spark elasticsearch e2e module in pom.xml, please help me rerun the CI workflows. thx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] nutsjian commented on a diff in pull request #3089: [Feature][Connector-V2][JDBC] support sqlite Source & Sink

Posted by GitBox <gi...@apache.org>.
nutsjian commented on code in PR #3089:
URL: https://github.com/apache/incubator-seatunnel/pull/3089#discussion_r1024680393


##########
seatunnel-connectors-v2/connector-jdbc/pom.xml:
##########
@@ -78,20 +80,30 @@
                 <version>${oracle.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.xerial</groupId>
+                <artifactId>sqlite-jdbc</artifactId>
+                <version>${sqlite.version}</version>
+                <scope>provided</scope>
+            </dependency>
             <dependency>
                 <groupId>com.ibm.db2.jcc</groupId>
                 <artifactId>db2jcc</artifactId>
                 <version>${db2.version}</version>
                 <scope>provided</scope>
             </dependency>
-
+            <dependency>
+                <groupId>org.xerial</groupId>
+                <artifactId>sqlite-jdbc</artifactId>
+                <version>${sqlite.version}</version>
+                <scope>provided</scope>

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org