You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/19 09:15:29 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][Jdbc] support gbase 8a (#3026)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dc6e85d06 [Feature][Connector-V2][Jdbc] support gbase 8a (#3026)
dc6e85d06 is described below
commit dc6e85d06f08e6b0bdcbb93d1cc5ed0e5c213385
Author: liugddx <80...@qq.com>
AuthorDate: Wed Oct 19 17:15:24 2022 +0800
[Feature][Connector-V2][Jdbc] support gbase 8a (#3026)
* gbase 8a connector
* add gbase8a e2e test
---
docs/en/connector-v2/sink/Jdbc.md | 1 +
docs/en/connector-v2/source/Jdbc.md | 1 +
.../internal/dialect/gbase8a/Gbase8aDialect.java | 43 +++---
.../dialect/gbase8a/Gbase8aDialectFactory.java | 40 ++---
.../dialect/gbase8a/Gbase8aJdbcRowConverter.java | 42 +++--
.../Gbase8aTypeMapper.java} | 97 ++++++------
.../internal/dialect/oracle/OracleTypeMapper.java | 7 +-
.../connector-jdbc-e2e/pom.xml | 6 +
.../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 64 +++++---
.../connectors/seatunnel/jdbc/JdbcCase.java | 1 +
.../connectors/seatunnel/jdbc/JdbcGbse8adbIT.java | 117 ++++++++++++++
.../connectors/seatunnel/jdbc/JdbcOracledbIT.java | 45 +++---
.../seatunnel/jdbc/JdbcStarRocksdbIT.java | 169 +++++++++++----------
.../resources/jdbc_gbase8a_source_to_assert.conf | 67 ++++++++
14 files changed, 441 insertions(+), 259 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md
index 989e69b2c..79056ee4d 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -116,6 +116,7 @@ there are some reference value for params above.
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
## Example
diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md
index 0d3ca3981..546ecd26e 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -98,6 +98,7 @@ there are some reference value for params above.
| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client |
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar |
| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java |
## Example
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
similarity index 51%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
index fd182ce3b..b90097445 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java
@@ -15,34 +15,25 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
-import lombok.Builder;
-import lombok.Data;
+public class Gbase8aDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return "Gbase8a";
+ }
-import java.util.Map;
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new Gbase8aJdbcRowConverter();
+ }
-@Data
-@Builder
-public class JdbcCase {
- private String dockerImage;
- private String networkAliases;
- private String driverClass;
- private String host;
- private String userName;
- private String password;
- private int port;
- private String dataBase;
- private String sourceTable;
- private String sinkTable;
- private String driverJar;
- private String jdbcUrl;
- private String ddlSource;
- private String ddlSink;
- private String initDataSql;
- private String configFile;
- private SeaTunnelRow seaTunnelRow;
- private Map<String, String> containerEnv;
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new Gbase8aTypeMapper();
+ }
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
similarity index 51%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
index fd182ce3b..31787f497 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
@@ -15,34 +15,22 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
-import lombok.Builder;
-import lombok.Data;
+import com.google.auto.service.AutoService;
-import java.util.Map;
+@AutoService(JdbcDialectFactory.class)
+public class Gbase8aDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:gbase:");
+ }
-@Data
-@Builder
-public class JdbcCase {
- private String dockerImage;
- private String networkAliases;
- private String driverClass;
- private String host;
- private String userName;
- private String password;
- private int port;
- private String dataBase;
- private String sourceTable;
- private String sinkTable;
- private String driverJar;
- private String jdbcUrl;
- private String ddlSource;
- private String ddlSink;
- private String initDataSql;
- private String configFile;
- private SeaTunnelRow seaTunnelRow;
- private Map<String, String> containerEnv;
+ @Override
+ public JdbcDialect create() {
+ return new Gbase8aDialect();
+ }
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java
similarity index 53%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java
index fd182ce3b..6ada1d39e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java
@@ -15,34 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
-import lombok.Builder;
-import lombok.Data;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
-import java.util.Map;
+public class Gbase8aJdbcRowConverter extends AbstractJdbcRowConverter {
+ @Override
+ public String converterName() {
+ return "Gbase8a";
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
+ return super.toInternal(rs, metaData, typeInfo);
+ }
-@Data
-@Builder
-public class JdbcCase {
- private String dockerImage;
- private String networkAliases;
- private String driverClass;
- private String host;
- private String userName;
- private String password;
- private int port;
- private String dataBase;
- private String sourceTable;
- private String sinkTable;
- private String driverJar;
- private String jdbcUrl;
- private String ddlSource;
- private String ddlSink;
- private String initDataSql;
- private String configFile;
- private SeaTunnelRow seaTunnelRow;
- private Map<String, String> containerEnv;
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java
similarity index 52%
copy from seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
copy to seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java
index a11a8c9c7..65677ea22 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -30,88 +30,81 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@Slf4j
-public class OracleTypeMapper implements JdbcDialectTypeMapper {
+public class Gbase8aTypeMapper implements JdbcDialectTypeMapper {
+ //ref http://www.gbase.cn/down/4419.html
// ============================data types=====================
-
- private static final String ORACLE_UNKNOWN = "UNKNOWN";
+ private static final String GBASE8A_UNKNOWN = "UNKNOWN";
// -------------------------number----------------------------
- private static final String ORACLE_BINARY_DOUBLE = "BINARY_DOUBLE";
- private static final String ORACLE_BINARY_FLOAT = "BINARY_FLOAT";
- private static final String ORACLE_NUMBER = "NUMBER";
- private static final String ORACLE_FLOAT = "FLOAT";
- private static final String ORACLE_REAL = "REAL";
- private static final String ORACLE_INTEGER = "INTEGER";
+ private static final String GBASE8A_INT = "INT";
+ private static final String GBASE8A_TINYINT = "TINYINT";
+ private static final String GBASE8A_SMALLINT = "SMALLINT";
+ private static final String GBASE8A_BIGINT = "BIGINT";
+ private static final String GBASE8A_DECIMAL = "DECIMAL";
+ private static final String GBASE8A_FLOAT = "FLOAT";
+ private static final String GBASE8A_DOUBLE = "DOUBLE";
// -------------------------string----------------------------
- private static final String ORACLE_CHAR = "CHAR";
- private static final String ORACLE_VARCHAR2 = "VARCHAR2";
- private static final String ORACLE_NCHAR = "NCHAR";
- private static final String ORACLE_NVARCHAR2 = "NVARCHAR2";
- private static final String ORACLE_LONG = "LONG";
- private static final String ORACLE_ROWID = "ROWID";
- private static final String ORACLE_CLOB = "CLOB";
- private static final String ORACLE_NCLOB = "NCLOB";
+ private static final String GBASE8A_CHAR = "CHAR";
+ private static final String GBASE8A_VARCHAR = "VARCHAR";
+
// ------------------------------time-------------------------
- private static final String ORACLE_DATE = "DATE";
- private static final String ORACLE_TIMESTAMP = "TIMESTAMP";
- private static final String ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE = "TIMESTAMP WITH LOCAL TIME ZONE";
+ private static final String GBASE8A_DATE = "DATE";
+ private static final String GBASE8A_TIME = "TIME";
+ private static final String GBASE8A_TIMESTAMP = "TIMESTAMP";
+ private static final String GBASE8A_DATETIME = "DATETIME";
// ------------------------------blob-------------------------
- private static final String ORACLE_BLOB = "BLOB";
- private static final String ORACLE_BFILE = "BFILE";
- private static final String ORACLE_RAW = "RAW";
- private static final String ORACLE_LONG_RAW = "LONG RAW";
+ private static final String GBASE8A_BLOB = "BLOB";
+ private static final String GBASE8A_TEXT = "TEXT";
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
- String oracleType = metadata.getColumnTypeName(colIndex).toUpperCase();
- String columnName = metadata.getColumnName(colIndex);
+ String gbase8aType = metadata.getColumnTypeName(colIndex).toUpperCase();
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
- switch (oracleType) {
- case ORACLE_INTEGER:
+ switch (gbase8aType) {
+ case GBASE8A_TINYINT:
+ return BasicType.BYTE_TYPE;
+ case GBASE8A_SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case GBASE8A_INT:
return BasicType.INT_TYPE;
- case ORACLE_NUMBER:
+ case GBASE8A_BIGINT:
+ return BasicType.LONG_TYPE;
+ case GBASE8A_DECIMAL:
if (precision < 38) {
return new DecimalType(precision, scale);
}
return new DecimalType(38, 18);
- case ORACLE_FLOAT:
- case ORACLE_BINARY_DOUBLE:
+ case GBASE8A_DOUBLE:
return BasicType.DOUBLE_TYPE;
- case ORACLE_BINARY_FLOAT:
- case ORACLE_REAL:
+ case GBASE8A_FLOAT:
return BasicType.FLOAT_TYPE;
- case ORACLE_CHAR:
- case ORACLE_NCHAR:
- case ORACLE_NVARCHAR2:
- case ORACLE_VARCHAR2:
- case ORACLE_LONG:
- case ORACLE_ROWID:
- case ORACLE_NCLOB:
- case ORACLE_CLOB:
+ case GBASE8A_CHAR:
+ case GBASE8A_VARCHAR:
return BasicType.STRING_TYPE;
- case ORACLE_DATE:
- case ORACLE_TIMESTAMP:
- case ORACLE_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case GBASE8A_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case GBASE8A_TIME:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case GBASE8A_TIMESTAMP:
+ case GBASE8A_DATETIME:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- case ORACLE_BLOB:
- case ORACLE_RAW:
- case ORACLE_LONG_RAW:
- case ORACLE_BFILE:
+ case GBASE8A_BLOB:
+ case GBASE8A_TEXT:
return PrimitiveByteArrayType.INSTANCE;
//Doesn't support yet
- case ORACLE_UNKNOWN:
+ case GBASE8A_UNKNOWN:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
- "Doesn't support ORACLE type '%s' on column '%s' yet.",
- oracleType, jdbcColumnName));
+ "Doesn't support GBASE8A type '%s' on column '%s' yet.",
+ gbase8aType, jdbcColumnName));
}
}
}
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
index a11a8c9c7..eb1f9097f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleTypeMapper.java
@@ -75,12 +75,11 @@ public class OracleTypeMapper implements JdbcDialectTypeMapper {
switch (oracleType) {
case ORACLE_INTEGER:
return BasicType.INT_TYPE;
+ case ORACLE_FLOAT:
case ORACLE_NUMBER:
- if (precision < 38) {
- return new DecimalType(precision, scale);
- }
+ //The float type will be converted to DecimalType(10, -127),
+ // which will lose precision in the spark engine
return new DecimalType(38, 18);
- case ORACLE_FLOAT:
case ORACLE_BINARY_DOUBLE:
return BasicType.DOUBLE_TYPE;
case ORACLE_BINARY_FLOAT:
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
index 000ce2c49..2a6516772 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/pom.xml
@@ -45,6 +45,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<!-- jdbc containers -->
<dependency>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index 3082881da..4e9088003 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc;
import static org.awaitility.Awaitility.given;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
@@ -37,11 +39,15 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.sql.Connection;
-import java.sql.DriverManager;
+import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -49,10 +55,8 @@ import java.util.stream.Stream;
public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResource {
protected static final String HOST = "HOST";
- protected Connection jdbcConnection;
protected GenericContainer<?> dbServer;
- private JdbcCase jdbcCase;
- private String jdbcUrl;
+ protected JdbcCase jdbcCase;
abstract JdbcCase getJdbcCase();
@@ -62,13 +66,18 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
abstract SeaTunnelRow initTestData();
+ protected Connection createAndChangeDatabase(Connection connection) {
+ //do nothing
+ return connection;
+ }
+
@TestContainerExtension
private final ContainerExtendedFactory extendedFactory = container -> {
Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + jdbcCase.getDriverJar());
Assertions.assertEquals(0, extraCommands.getExitCode());
};
- private void getContainer() throws ClassNotFoundException, SQLException {
+ private void getContainer() throws SQLException, MalformedURLException, ClassNotFoundException, InstantiationException, IllegalAccessException {
jdbcCase = this.getJdbcCase();
dbServer = new GenericContainer<>(jdbcCase.getDockerImage())
.withNetwork(NETWORK)
@@ -78,21 +87,28 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
dbServer.setPortBindings(Lists.newArrayList(
String.format("%s:%s", jdbcCase.getPort(), jdbcCase.getPort())));
Startables.deepStart(Stream.of(dbServer)).join();
- Class.forName(jdbcCase.getDriverClass());
+
given().ignoreExceptions()
.await()
.atMost(180, TimeUnit.SECONDS)
- .untilAsserted(this::initializeJdbcConnection);
- initializeJdbcTable();
+ .untilAsserted(() -> {
+ this.initializeJdbcConnection(jdbcCase.getJdbcUrl());
+ });
+ this.initializeJdbcTable();
}
- protected void initializeJdbcConnection() throws SQLException {
- jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost());
- jdbcConnection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword());
+ protected Connection initializeJdbcConnection(String jdbcUrl) throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException {
+ URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(jdbcCase.getDriverJar())}, AbstractJdbcIT.class.getClassLoader());
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ Driver driver = (Driver) urlClassLoader.loadClass(jdbcCase.getDriverClass()).newInstance();
+ Properties props = new Properties();
+ props.put("user", jdbcCase.getUserName());
+ props.put("password", jdbcCase.getPassword());
+ return driver.connect(jdbcUrl.replace(HOST, dbServer.getHost()), props);
}
private void batchInsertData() {
- try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) {
+ try (Connection connection = initializeJdbcConnection(String.format(jdbcCase.getJdbcTemplate(), jdbcCase.getPort(), jdbcCase.getDataBase()))) {
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(jdbcCase.getInitDataSql())) {
@@ -102,20 +118,25 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
preparedStatement.execute();
}
connection.commit();
- } catch (SQLException exception) {
- exception.printStackTrace();
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new RuntimeException("get connection error", exception);
}
}
- private void initializeJdbcTable() throws SQLException {
- try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) {
- Statement statement = connection.createStatement();
+ private void initializeJdbcTable() {
+ try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) {
+ Connection newConnection = createAndChangeDatabase(connection);
+ Statement statement = newConnection.createStatement();
String createSource = jdbcCase.getDdlSource();
String createSink = jdbcCase.getDdlSink();
statement.execute(createSource);
- statement.execute(createSink);
- } catch (SQLException exception) {
- exception.printStackTrace();
+ if (StringUtils.isNotEmpty(createSink)) {
+ statement.execute(createSink);
+ }
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new RuntimeException("get connection error", exception);
}
this.batchInsertData();
}
@@ -128,9 +149,6 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour
@Override
public void tearDown() throws Exception {
- if (jdbcConnection != null) {
- jdbcConnection.close();
- }
if (dbServer != null) {
dbServer.close();
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index fd182ce3b..d4659a9b3 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -38,6 +38,7 @@ public class JdbcCase {
private String sourceTable;
private String sinkTable;
private String driverJar;
+ private String jdbcTemplate;
private String jdbcUrl;
private String ddlSource;
private String ddlSink;
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java
new file mode 100644
index 000000000..00efd4f1b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JdbcGbse8adbIT extends AbstractJdbcIT {
+
+ private static final String DOCKER_IMAGE = "shihd/gbase8a:1.0";
+ private static final String NETWORK_ALIASES = "e2e_gbase8aDb";
+ private static final String DRIVER_CLASS = "com.gbase.jdbc.Driver";
+ private static final int PORT = 5258;
+ private static final String URL = "jdbc:gbase://" + HOST + ":%s/%s?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true";
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "root";
+ private static final String DATABASE = "gbase";
+ private static final String SOURCE_TABLE = "e2e_table_source";
+ private static final String DRIVER_JAR = "https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar";
+ private static final String CONFIG_FILE = "/jdbc_gbase8a_source_to_assert.conf";
+ private static final String DDL_SOURCE = "CREATE TABLE " + SOURCE_TABLE + "(\n" +
+ " \"varchar_10_col\" varchar(10) DEFAULT NULL,\n" +
+ " \"char_10_col\" char(10) DEFAULT NULL,\n" +
+ " \"text_col\" text,\n" +
+ " \"decimal_col\" decimal(10,0) DEFAULT NULL,\n" +
+ " \"float_col\" float(12,0) DEFAULT NULL,\n" +
+ " \"int_col\" int(11) DEFAULT NULL,\n" +
+ " \"tinyint_col\" tinyint(4) DEFAULT NULL,\n" +
+ " \"smallint_col\" smallint(6) DEFAULT NULL,\n" +
+ " \"double_col\" double(22,0) DEFAULT NULL,\n" +
+ " \"bigint_col\" bigint(20) DEFAULT NULL,\n" +
+ " \"date_col\" date DEFAULT NULL,\n" +
+ " \"timestamp_col\" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" +
+ " \"datetime_col\" datetime DEFAULT NULL,\n" +
+ " \"blob_col\" blob\n" +
+ ")";
+ private static final String INIT_DATA_SQL = "insert into " + SOURCE_TABLE + " (\n" +
+ " varchar_10_col,\n" +
+ " char_10_col,\n" +
+ " text_col,\n" +
+ " decimal_col,\n" +
+ " float_col,\n" +
+ " int_col,\n" +
+ " tinyint_col,\n" +
+ " smallint_col,\n" +
+ " double_col,\n" +
+ " bigint_col,\n" +
+ " date_col,\n" +
+ " timestamp_col,\n" +
+ " datetime_col,\n" +
+ " blob_col\n" +
+ ")values(\n" +
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+ ")";
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(URL, PORT, DATABASE);
+ return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
+ .host(HOST).port(PORT).jdbcTemplate(URL).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+ .sourceTable(SOURCE_TABLE).driverJar(DRIVER_JAR)
+ .ddlSource(DDL_SOURCE).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
+ }
+
+ @Override
+ void compareResult() {
+ //do nothing
+ }
+
+ @Override
+ void clearSinkTable() {
+ //do nothing
+ }
+
+ @Override
+ SeaTunnelRow initTestData() {
+ return new SeaTunnelRow(
+ new Object[]{"varchar", "char10col1", "text_col".getBytes(StandardCharsets.UTF_8), 122, 122.0, 122, 100, 1212, 122.0,
+ 3112121, new java.sql.Date(LocalDate.now().toEpochDay()), new Timestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)), new Timestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)), "blob".getBytes(StandardCharsets.UTF_8)});
+ }
+
+ protected Connection createAndChangeDatabase(Connection connection) {
+ try {
+ connection.prepareStatement("CREATE DATABASE test").executeUpdate();
+ jdbcCase.setDataBase("test");
+ connection.close();
+ return initializeJdbcConnection(String.format(URL, PORT, jdbcCase.getDataBase()));
+ } catch (Exception e) {
+ throw new RuntimeException("create database error", e);
+ }
+
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
index 48e34c0c7..036415f69 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java
@@ -26,6 +26,7 @@ import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -98,7 +99,7 @@ public class JdbcOracledbIT extends AbstractJdbcIT {
" raw_col,\n" +
" blob_col\n" +
")values(\n" +
- "\t?,?,?,?,?,?,?,?,?,?,?,?,?,rawtohex(?),rawtohex(?)\n" +
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,rawtohex(?),rawtohex(?)\n" +
")";
@Override
@@ -109,7 +110,7 @@ public class JdbcOracledbIT extends AbstractJdbcIT {
containerEnv.put("APP_USER_PASSWORD", PASSWORD);
String jdbcUrl = String.format(URL, PORT, DATABASE);
return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
- .host(HOST).port(PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+ .host(HOST).port(PORT).jdbcTemplate(URL).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
.sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(DRIVER_JAR)
.ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
}
@@ -119,35 +120,39 @@ public class JdbcOracledbIT extends AbstractJdbcIT {
String sourceSql = "select * from " + SOURCE_TABLE;
String sinkSql = "select * from " + SINK_TABLE;
List<String> columns = Lists.newArrayList("varchar_10_col", "char_10_col", "clob_col", "number_3_sf_2_dp", "integer_col", "float_col", "real_col", "binary_float_col", "binary_double_col", "date_col", "timestamp_with_3_frac_sec_col", "timestamp_with_local_tz", "raw_col", "blob_col");
- Statement sourceStatement = jdbcConnection.createStatement();
- Statement sinkStatement = jdbcConnection.createStatement();
- ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
- ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
- while (sourceResultSet.next()) {
- if (sinkResultSet.next()) {
- for (String column : columns) {
- Object source = sourceResultSet.getObject(column);
- Object sink = sinkResultSet.getObject(column);
- if (!Objects.deepEquals(source, sink)) {
+ try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) {
+ Statement sourceStatement = connection.createStatement();
+ Statement sinkStatement = connection.createStatement();
+ ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : columns) {
+ Object source = sourceResultSet.getObject(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
- InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column);
- InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column);
- String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
- String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
- Assertions.assertEquals(sourceValue, sinkValue);
+ InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column);
+ InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column);
+ String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+ String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ Assertions.assertTrue(true);
}
- Assertions.assertTrue(true);
}
}
+ } catch (Exception e) {
+ throw new RuntimeException("get oracle connection error", e);
}
clearSinkTable();
}
@Override
void clearSinkTable() {
- try (Statement statement = jdbcConnection.createStatement()) {
+ try (Statement statement = initializeJdbcConnection(jdbcCase.getJdbcUrl()).createStatement()) {
statement.execute(String.format("TRUNCATE TABLE %s", SINK_TABLE));
- } catch (SQLException e) {
+ } catch (Exception e) {
throw new RuntimeException("test oracle server image error", e);
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
index 90b062985..20c5da159 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcStarRocksdbIT.java
@@ -22,9 +22,8 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.util.JdbcCompareUtil;
import org.junit.jupiter.api.Assertions;
-import java.io.IOException;
+import java.sql.Connection;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
@@ -46,100 +45,104 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
private static final String COLUMN_STRING = "BIGINT_COL, LARGEINT_COL, SMALLINT_COL, TINYINT_COL, BOOLEAN_COL, DECIMAL_COL, DOUBLE_COL, FLOAT_COL, INT_COL, CHAR_COL, VARCHAR_11_COL, STRING_COL, DATETIME_COL, DATE_COL";
private static final String CONFIG_FILE = "/jdbc_starrocks_source_to_sink.conf";
- private static final String DDL_SOURCE = "create table " + DATABASE + "." + SOURCE_TABLE + " (\n" +
- " BIGINT_COL BIGINT,\n" +
- " LARGEINT_COL LARGEINT,\n" +
- " SMALLINT_COL SMALLINT,\n" +
- " TINYINT_COL TINYINT,\n" +
- " BOOLEAN_COL BOOLEAN,\n" +
- " DECIMAL_COL DECIMAL,\n" +
- " DOUBLE_COL DOUBLE,\n" +
- " FLOAT_COL FLOAT,\n" +
- " INT_COL INT,\n" +
- " CHAR_COL CHAR,\n" +
- " VARCHAR_11_COL VARCHAR(11),\n" +
- " STRING_COL STRING,\n" +
- " DATETIME_COL DATETIME,\n" +
- " DATE_COL DATE\n" +
- ")ENGINE=OLAP\n" +
- "DUPLICATE KEY(`BIGINT_COL`)\n" +
- "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
- "PROPERTIES (\n" +
- "\"replication_num\" = \"1\",\n" +
- "\"in_memory\" = \"false\"," +
- "\"in_memory\" = \"false\"," +
- "\"storage_format\" = \"DEFAULT\"" +
- ")";
-
-
- private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" +
- " BIGINT_COL BIGINT,\n" +
- " LARGEINT_COL LARGEINT,\n" +
- " SMALLINT_COL SMALLINT,\n" +
- " TINYINT_COL TINYINT,\n" +
- " BOOLEAN_COL BOOLEAN,\n" +
- " DECIMAL_COL DECIMAL,\n" +
- " DOUBLE_COL DOUBLE,\n" +
- " FLOAT_COL FLOAT,\n" +
- " INT_COL INT,\n" +
- " CHAR_COL CHAR,\n" +
- " VARCHAR_11_COL VARCHAR(11),\n" +
- " STRING_COL STRING,\n" +
- " DATETIME_COL DATETIME,\n" +
- " DATE_COL DATE\n" +
- ")ENGINE=OLAP\n" +
- "DUPLICATE KEY(`BIGINT_COL`)\n" +
- "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
- "PROPERTIES (\n" +
- "\"replication_num\" = \"1\",\n" +
- "\"in_memory\" = \"false\"," +
- "\"in_memory\" = \"false\"," +
- "\"storage_format\" = \"DEFAULT\"" +
- ")";
-
- private static final String INIT_DATA_SQL = "insert into " + DATABASE + "." + SOURCE_TABLE + " (\n" +
- " BIGINT_COL,\n" +
- " LARGEINT_COL,\n" +
- " SMALLINT_COL,\n" +
- " TINYINT_COL,\n" +
- " BOOLEAN_COL,\n" +
- " DECIMAL_COL,\n" +
- " DOUBLE_COL,\n" +
- " FLOAT_COL,\n" +
- " INT_COL,\n" +
- " CHAR_COL,\n" +
- " VARCHAR_11_COL,\n" +
- " STRING_COL,\n" +
- " DATETIME_COL,\n" +
- " DATE_COL\n" +
- ")values(\n" +
- "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
- ")";
+ private static final String DDL_SOURCE = "create table " + DATABASE + "." + SOURCE_TABLE + " (\n" +
+ " BIGINT_COL BIGINT,\n" +
+ " LARGEINT_COL LARGEINT,\n" +
+ " SMALLINT_COL SMALLINT,\n" +
+ " TINYINT_COL TINYINT,\n" +
+ " BOOLEAN_COL BOOLEAN,\n" +
+ " DECIMAL_COL DECIMAL,\n" +
+ " DOUBLE_COL DOUBLE,\n" +
+ " FLOAT_COL FLOAT,\n" +
+ " INT_COL INT,\n" +
+ " CHAR_COL CHAR,\n" +
+ " VARCHAR_11_COL VARCHAR(11),\n" +
+ " STRING_COL STRING,\n" +
+ " DATETIME_COL DATETIME,\n" +
+ " DATE_COL DATE\n" +
+ ")ENGINE=OLAP\n" +
+ "DUPLICATE KEY(`BIGINT_COL`)\n" +
+ "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\",\n" +
+ "\"in_memory\" = \"false\"," +
+ "\"in_memory\" = \"false\"," +
+ "\"storage_format\" = \"DEFAULT\"" +
+ ")";
+
+
+ private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" +
+ " BIGINT_COL BIGINT,\n" +
+ " LARGEINT_COL LARGEINT,\n" +
+ " SMALLINT_COL SMALLINT,\n" +
+ " TINYINT_COL TINYINT,\n" +
+ " BOOLEAN_COL BOOLEAN,\n" +
+ " DECIMAL_COL DECIMAL,\n" +
+ " DOUBLE_COL DOUBLE,\n" +
+ " FLOAT_COL FLOAT,\n" +
+ " INT_COL INT,\n" +
+ " CHAR_COL CHAR,\n" +
+ " VARCHAR_11_COL VARCHAR(11),\n" +
+ " STRING_COL STRING,\n" +
+ " DATETIME_COL DATETIME,\n" +
+ " DATE_COL DATE\n" +
+ ")ENGINE=OLAP\n" +
+ "DUPLICATE KEY(`BIGINT_COL`)\n" +
+ "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" +
+ "PROPERTIES (\n" +
+ "\"replication_num\" = \"1\",\n" +
+ "\"in_memory\" = \"false\"," +
+ "\"in_memory\" = \"false\"," +
+ "\"storage_format\" = \"DEFAULT\"" +
+ ")";
+
+ private static final String INIT_DATA_SQL = "insert into " + DATABASE + "." + SOURCE_TABLE + " (\n" +
+ " BIGINT_COL,\n" +
+ " LARGEINT_COL,\n" +
+ " SMALLINT_COL,\n" +
+ " TINYINT_COL,\n" +
+ " BOOLEAN_COL,\n" +
+ " DECIMAL_COL,\n" +
+ " DOUBLE_COL,\n" +
+ " FLOAT_COL,\n" +
+ " INT_COL,\n" +
+ " CHAR_COL,\n" +
+ " VARCHAR_11_COL,\n" +
+ " STRING_COL,\n" +
+ " DATETIME_COL,\n" +
+ " DATE_COL\n" +
+ ")values(\n" +
+ "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" +
+ ")";
@Override
JdbcCase getJdbcCase() {
Map<String, String> containerEnv = new HashMap<>();
String jdbcUrl = String.format(URL, SR_PORT);
return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS)
- .host(HOST).port(SR_PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
- .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(SR_DRIVER_JAR)
- .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
+ .host(HOST).jdbcTemplate(URL).dataBase(DATABASE).port(SR_PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE)
+ .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(SR_DRIVER_JAR)
+ .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build();
}
@Override
- void compareResult() throws SQLException, IOException {
- assertHasData(SOURCE_TABLE);
- assertHasData(SINK_TABLE);
- JdbcCompareUtil.compare(jdbcConnection, String.format("select * from %s.%s limit 1", DATABASE, SOURCE_TABLE),
+ void compareResult() {
+ try (Connection connection = initializeJdbcConnection(URL)) {
+ assertHasData(SOURCE_TABLE);
+ assertHasData(SINK_TABLE);
+ JdbcCompareUtil.compare(connection, String.format("select * from %s.%s limit 1", DATABASE, SOURCE_TABLE),
String.format("select * from %s.%s limit 1", DATABASE, SINK_TABLE), COLUMN_STRING);
+ } catch (Exception e) {
+ throw new RuntimeException("get starRocks connection error", e);
+ }
clearSinkTable();
}
@Override
void clearSinkTable() {
- try (Statement statement = jdbcConnection.createStatement()) {
+ try (Statement statement = initializeJdbcConnection(URL).createStatement()) {
statement.execute(String.format("TRUNCATE TABLE %s", DATABASE + "." + SINK_TABLE));
- } catch (SQLException e) {
+ } catch (Exception e) {
throw new RuntimeException("test starrocks server image error", e);
}
}
@@ -147,15 +150,15 @@ public class JdbcStarRocksdbIT extends AbstractJdbcIT {
@Override
SeaTunnelRow initTestData() {
return new SeaTunnelRow(
- new Object[]{1234, 1123456, 12, 1, 0, 2222243.2222243, 2222243.22222, 1.22222, 12, "a", "VARCHAR_COL", "STRING_COL", "2022-08-13 17:35:59", "2022-08-13"});
+ new Object[]{1234, 1123456, 12, 1, 0, 2222243.2222243, 2222243.22222, 1.22222, 12, "a", "VARCHAR_COL", "STRING_COL", "2022-08-13 17:35:59", "2022-08-13"});
}
private void assertHasData(String table) {
- try (Statement statement = jdbcConnection.createStatement()) {
+ try (Statement statement = initializeJdbcConnection(URL).createStatement()) {
String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
ResultSet source = statement.executeQuery(sql);
Assertions.assertTrue(source.next());
- } catch (SQLException e) {
+ } catch (Exception e) {
throw new RuntimeException("test starrocks server image error", e);
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
new file mode 100644
index 000000000..4b4c5ad62
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_assert.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = com.gbase.jdbc.Driver
+ url = "jdbc:gbase://e2e_gbase8aDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+ user = root
+ password = root
+ query = "select varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, timestamp_col, datetime_col, blob_col from e2e_table_source"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ }
+ ]
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert
+}