You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/07 02:02:07 UTC
[shardingsphere] branch master updated: Optimized CDC implementation with MySQL and PostgreSQL support (#23906)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 26c41c1abf6 Optimized CDC implementation with MySQL and PostgreSQL support (#23906)
26c41c1abf6 is described below
commit 26c41c1abf6ca68bdb6679b1af74324448920972
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Tue Feb 7 10:01:56 2023 +0800
Optimized CDC implementation with MySQL and PostgreSQL support (#23906)
* Optimized CDC implementation with MySQL and PostgreSQL support
* Add CDC client implementation.
* Fix codestyle
* Refactor ImportDataSourceParameter
* Fix ci error
* Fix name
---
kernel/data-pipeline/cdc/client/pom.xml | 4 ++
.../data/pipeline/cdc/client/CDCClient.java | 16 +++--
.../cdc/client/handler/LoginRequestHandler.java | 4 +-
.../client/handler/SubscriptionRequestHandler.java | 6 +-
...nGaussImporter.java => DataSourceImporter.java} | 38 ++++-------
.../ImportDataSourceParameter.java} | 31 ++++-----
.../client/parameter/StartCDCClientParameter.java | 4 ++
.../cdc/client/sqlbuilder/MySQLSQLBuilder.java | 79 ++++++++++++++++++++++
.../cdc/client/sqlbuilder/OpenGaussSQLBuilder.java | 15 +++-
.../client/sqlbuilder/PostgreSQLSQLBuilder.java | 74 ++++++++++++++++++++
.../cdc/client/sqlbuilder/SQLBuilderFactory.java | 6 +-
.../pipeline/cdc/client/util/AnyValueConvert.java | 7 +-
.../cdc/client/example/opengauss/Bootstrap.java | 4 +-
.../cdc/client/sqlbuilder/MySQLSQLBuilderTest.java | 62 +++++++++++++++++
.../client/sqlbuilder/OpenGaussSQLBuilderTest.java | 62 +++++++++++++++++
.../sqlbuilder/PostgreSQLSQLBuilderTest.java | 62 +++++++++++++++++
.../src/test/resources/env/opengauss.properties | 22 ------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 14 +++-
.../importer/connector/CDCImporterConnector.java | 2 +-
.../pipeline/cdc/util/ColumnValueConvertUtil.java | 37 +++-------
.../mysql/ingest/MySQLIncrementalDumper.java | 2 +-
.../backend/handler/cdc/CDCBackendHandler.java | 5 +-
.../pipeline/cases/base/PipelineBaseE2EIT.java | 6 +-
.../cases/migration/AbstractMigrationE2EIT.java | 7 --
24 files changed, 447 insertions(+), 122 deletions(-)
diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index 03be36d694b..f549dbb815f 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -47,5 +47,9 @@
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
</dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 29e8dc33f38..fbb0a5c8f79 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -28,7 +28,6 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.SubscriptionRequestHandler;
@@ -69,15 +68,15 @@ public final class CDCClient {
/**
* Start ShardingSphere CDC client.
*/
- @SneakyThrows(InterruptedException.class)
public void start() {
startInternal(parameter.getAddress(), parameter.getPort());
}
- private void startInternal(final String address, final int port) throws InterruptedException {
+ private void startInternal(final String address, final int port) {
Bootstrap bootstrap = new Bootstrap();
+ NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.channel(NioSocketChannel.class)
- .group(new NioEventLoopGroup())
+ .group(group)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@@ -92,7 +91,12 @@ public final class CDCClient {
channel.pipeline().addLast(new SubscriptionRequestHandler(parameter));
}
});
- ChannelFuture future = bootstrap.connect(address, port).sync();
- future.channel().closeFuture().sync();
+ try {
+ ChannelFuture future = bootstrap.connect(address, port).sync();
+ future.channel().closeFuture().sync();
+ } catch (final InterruptedException ex) {
+ log.warn("CDC client interrupted", ex);
+ group.shutdownGracefully();
+ }
}
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index f844cb13393..d14df086502 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -65,7 +65,7 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
ClientConnectionContext connectionContext = ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
switch (connectionContext.getStatus()) {
case CONNECTED:
- setLogindRequest(ctx, response, connectionContext);
+ setLoginRequest(ctx, response, connectionContext);
break;
case NOT_LOGGED_IN:
sendSubscriptionEvent(ctx, response, connectionContext);
@@ -75,7 +75,7 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
}
}
- private void setLogindRequest(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
+ private void setLoginRequest(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
ServerGreetingResult serverGreetingResult = response.getServerGreetingResult();
log.info("Server greeting result, server version: {}, protocol version: {}", serverGreetingResult.getServerVersion(), serverGreetingResult.getProtocolVersion());
String encryptPassword = Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index 8ac5fbe063a..e7288950a8a 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.importer.DataSourceImporter;
import org.apache.shardingsphere.data.pipeline.cdc.client.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.cdc.client.importer.ImporterFactory;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
@@ -51,7 +51,7 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
public SubscriptionRequestHandler(final StartCDCClientParameter parameter) {
this.parameter = parameter;
- importer = ImporterFactory.getImporter(parameter.getDatabaseType());
+ importer = new DataSourceImporter(parameter.getDatabaseType(), parameter.getImportDataSourceParameter());
}
@Override
@@ -105,7 +105,7 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
- log.error("write data failed", ex);
+ throw new RuntimeException(ex);
}
}
// TODO data needs to be processed, such as writing to a database
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java
similarity index 78%
rename from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
rename to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java
index 7ff1c61f183..19fe8231793 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/DataSourceImporter.java
@@ -18,17 +18,14 @@
package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
import com.google.protobuf.Any;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilder;
import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.AnyValueConvert;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
-import java.io.IOException;
-import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -36,36 +33,31 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.Properties;
/**
- * OpenGauss importer.
+ * Data source importer.
*/
-@RequiredArgsConstructor
@Slf4j
-public final class OpenGaussImporter implements Importer {
-
- private final SQLBuilder sqlBuilder = SQLBuilderFactory.getSQLBuilder("openGauss");
+public final class DataSourceImporter implements Importer {
private final Connection connection;
- public OpenGaussImporter() {
- Properties properties = new Properties();
- try (InputStream inputStream = OpenGaussImporter.class.getClassLoader().getResourceAsStream("env/opengauss.properties")) {
- properties.load(inputStream);
- String url = properties.getProperty("url");
- String port = properties.getProperty("port");
- String database = properties.getProperty("database");
- String username = properties.getProperty("username");
- String password = properties.getProperty("password");
- connection = DriverManager.getConnection(String.format("jdbc:opengauss://%s:%s/%s", url, port, database), username, password);
- } catch (final IOException | SQLException ex) {
+ private final SQLBuilder sqlBuilder;
+
+ public DataSourceImporter(final String databaseType, final ImportDataSourceParameter dataSourceParam) {
+ String jdbcUrl = Optional.ofNullable(dataSourceParam.getJdbcUrl()).orElseThrow(() -> new IllegalArgumentException("jdbcUrl is null"));
+ String username = Optional.ofNullable(dataSourceParam.getUsername()).orElseThrow(() -> new IllegalArgumentException("username is null"));
+ String password = Optional.ofNullable(dataSourceParam.getPassword()).orElseThrow(() -> new IllegalArgumentException("password is null"));
+ try {
+ connection = DriverManager.getConnection(jdbcUrl, username, password);
+ } catch (final SQLException ex) {
throw new RuntimeException(ex);
}
+ sqlBuilder = SQLBuilderFactory.getSQLBuilder(databaseType);
}
@Override
- public void write(final Record record) throws SQLException, InvalidProtocolBufferException {
+ public void write(final Record record) throws Exception {
Optional<String> sqlOptional = buildSQL(record);
if (!sqlOptional.isPresent()) {
log.error("build sql failed, record {}", record);
@@ -120,7 +112,7 @@ public final class OpenGaussImporter implements Importer {
}
@Override
- public void close() throws SQLException {
+ public void close() throws Exception {
connection.close();
}
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
similarity index 62%
rename from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
rename to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
index 68a356d3fd2..52a9664fe6d 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/ImportDataSourceParameter.java
@@ -15,26 +15,21 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
/**
- * Importer factory.
+ * Import data source parameter.
*/
-public final class ImporterFactory {
+@RequiredArgsConstructor
+@Getter
+public final class ImportDataSourceParameter {
+
+ private final String jdbcUrl;
+
+ private final String username;
- /**
- * Get importer.
- *
- * @param databaseType database type
- * @return importer
- */
- // TODO use SPI
- public static Importer getImporter(final String databaseType) {
- switch (databaseType) {
- case "openGauss":
- return new OpenGaussImporter();
- default:
- return null;
- }
- }
+ private final String password;
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index 0377c1f9f55..a8c916f7b0c 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
@@ -29,6 +30,7 @@ import java.util.List;
*/
@Getter
@Setter
+@RequiredArgsConstructor
public final class StartCDCClientParameter {
private String databaseType;
@@ -50,4 +52,6 @@ public final class StartCDCClientParameter {
private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
private boolean incrementalGlobalOrderly;
+
+ private final ImportDataSourceParameter importDataSourceParameter;
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilder.java
new file mode 100644
index 00000000000..e9ea01263d4
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * MySQL SQL builder.
+ */
+public final class MySQLSQLBuilder extends AbstractSQLBuilder {
+
+ private static final List<String> RESERVED_KEYWORDS = Arrays.asList("ADD", "ALL", "ALTER", "ANALYZE", "AND", "AS", "ASC", "BEFORE", "BETWEEN", "BIGINT", "BINARY", "BLOB", "BOTH", "BY", "CALL",
+ "CASCADE", "CASE", "CHANGE", "CHAR", "CHARACTER", "CHECK", "COLLATE", "COLUMN", "CONDITION", "CONSTRAINT", "CONTINUE", "CONVERT", "CREATE", "CROSS", "CUBE", "CUME_DIST", "CURRENT_DATE",
+ "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "CURSOR", "DATABASE", "DATABASES", "DAY_HOUR", "DAY_MICROSECOND", "DAY_MINUTE", "DAY_SECOND", "DEC", "DECIMAL", "DECLARE", "DEFAULT",
+ "DELAYED", "DELETE", "DENSE_RANK", "DESC", "DESCRIBE", "DETERMINISTIC", "DISTINCT", "DISTINCTROW", "DIV", "DOUBLE", "DROP", "DUAL", "ELSE", "ELSEIF", "ENCLOSED", "ESCAPED", "EACH", "ELSE",
+ "ELSEIF", "EMPTY", "ENCLOSED", "ESCAPED", "EXCEPT", "EXISTS", "EXIT", "EXPLAIN", "FALSE", "FETCH", "FIRST_VALUE", "FLOAT4", "FLOAT8", "FOR", "FORCE", "FOREIGN", "FROM", "FULLTEXT",
+ "FUNCTION", "GENERATED", "GET", "GRANT", "GROUP", "GROUPING", "GROUPS", "HAVING", "HIGH_PRIORITY", "HOUR_MICROSECOND", "HOUR_MINUTE", "HOUR_SECOND", "IF", "IGNORE", "IN", "INDEX",
+ "INFILE", "INNER", "INOUT", "INSENSITIVE", "INSERT", "INT", "INT1", "INT2", "INT3", "INT4", "INT8", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IO_AFTER_GTIDS", "IO_BEFORE_GTIDS", "IS",
+ "ITERATE", "JOIN", "JSON_TABLE", "KEY", "KEYS", "KILL", "LAG", "LAST_VALUE", "LATERAL", "LEAD", "LEADING", "LEAVE", "LEFT", "LIKE", "LIMIT", "LINES", "LOAD", "LOCALTIME", "LOCALTIMESTAMP",
+ "LOCK", "LONG", "LONGBLOB", "LONGTEXT", "LOOP", "LOW_PRIORITY", "MASTER_BIND", "MASTER_SSL_VERIFY_SERVER_CERT", "MATCH", "MAXVALUE", "MEDIUMBLOB", "MEDIUMINT", "MEDIUMTEXT", "MIDDLEINT",
+ "MINUTE_MICROSECOND", "MINUTE_SECOND", "MOD", "MODIFIES", "NATURAL", "NOT", "NO_WRITE_TO_BINLOG", "NTH_VALUE", "NTILE", "NULL", "NUMERIC", "OF", "ON", "OPTIMIZE", "OPTIMIZER_COSTS",
+ "OPTION", "OPTIONALLY", "OR", "ORDER", "OUT", "OUTER", "OUTFILE", "OVER", "PARTITION", "PERCENT_RANK", "PRECISION", "PRIMARY", "PROCEDURE", "PURGE", "RANK", "READ", "REAL", "RECURSIVE",
+ "REFERENCES", "REGEXP", "RELEASE", "RENAME", "REPEAT", "REPLACE", "REQUIRE", "RESIGNAL", "RESTRICT", "RETURN", "REVOKE", "RIGHT", "RLIKE", "ROW", "ROWS", "ROW_NUMBER", "SCHEMA", "SCHEMAS",
+ "SELECT", "SENSITIVE", "SEPARATOR", "SET", "SHOW", "SIGNAL", "SMALLINT", "SPATIAL", "SPECIFIC", "SQL", "SQLEXCEPTION", "SQLSTATE", "SQLWARNING", "SQL_BIG_RESULT", "SQL_CALC_FOUND_ROWS",
+ "SQL_SMALL_RESULT", "SSL", "STARTING", "STORED", "STRAIGHT_JOIN", "SYSTEM", "TABLE", "TERMINATED", "THEN", "TINYBLOB", "TINYINT", "TINYTEXT", "TO", "TRAILING", "TRIGGER", "TRUE", "UNDO",
+ "UNION", "UNIQUE", "UNLOCK", "UNSIGNED", "UPDATE", "USAGE", "USE", "USING", "UTC_DATE", "UTC_TIME", "UTC_TIMESTAMP", "VALUES", "VARBINARY", "VARCHAR", "VARCHARACTER", "VARYING", "VIRTUAL",
+ "WHEN", "WHERE", "WHILE", "WINDOW", "WITH", "WRITE", "XOR", "YEAR_MONTH", "ZEROFILL");
+
+ @Override
+ protected boolean isKeyword(final String item) {
+ return RESERVED_KEYWORDS.contains(item.toUpperCase());
+ }
+
+ @Override
+ protected String getLeftIdentifierQuoteString() {
+ return "`";
+ }
+
+ @Override
+ protected String getRightIdentifierQuoteString() {
+ return "`";
+ }
+
+ @Override
+ public String buildInsertSQL(final Record record) {
+ String insertSql = super.buildInsertSQL(record);
+ List<String> uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+ if (uniqueKeyNamesList.isEmpty()) {
+ return insertSql;
+ }
+ StringBuilder updateValue = new StringBuilder();
+ for (String each : record.getAfterMap().keySet()) {
+ if (uniqueKeyNamesList.contains(each)) {
+ continue;
+ }
+ updateValue.append(quote(each)).append("=VALUES(").append(quote(each)).append("),");
+ }
+ updateValue.setLength(updateValue.length() - 1);
+ return insertSql + " ON DUPLICATE KEY UPDATE " + updateValue;
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
index bb0132b27a0..afc9b401b7c 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
@@ -58,9 +58,18 @@ public final class OpenGaussSQLBuilder extends AbstractSQLBuilder {
@Override
public String buildInsertSQL(final Record record) {
String insertSql = super.buildInsertSQL(record);
- if (!record.getTableMetaData().getUniqueKeyNamesList().isEmpty()) {
- return insertSql + " ON DUPLICATE KEY UPDATE NOTHING";
+ List<String> uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+ if (uniqueKeyNamesList.isEmpty()) {
+ return insertSql;
}
- return insertSql;
+ StringBuilder updateValue = new StringBuilder();
+ for (String each : record.getAfterMap().keySet()) {
+ if (uniqueKeyNamesList.contains(each)) {
+ continue;
+ }
+ updateValue.append(quote(each)).append("=EXCLUDED.").append(quote(each)).append(",");
+ }
+ updateValue.setLength(updateValue.length() - 1);
+ return insertSql + " ON DUPLICATE KEY UPDATE " + updateValue;
}
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilder.java
new file mode 100644
index 00000000000..95c5c37ce72
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * PostgreSQL SQL builder.
+ */
+public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
+
+ private static final List<String> RESERVED_KEYWORDS = Arrays.asList("ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", "ASYMMETRIC", "AUTHORIZATION", "BETWEEN", "BIGINT", "BINARY",
+ "BIT", "BOOLEAN", "BOTH", "CASE", "CAST", "CHAR", "CHARACTER", "CHECK", "COALESCE", "COLLATE", "COLLATION", "COLUMN", "CONCURRENTLY", "CONSTRAINT", "CREATE", "CROSS", "CURRENT_CATALOG",
+ "CURRENT_DATE", "CURRENT_ROLE", "CURRENT_SCHEMA", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "DEC", "DECIMAL", "DEFAULT", "DEFERRABLE", "DESC", "DISTINCT", "DO", "ELSE", "END",
+ "EXCEPT", "EXISTS", "EXTRACT", "FALSE", "FETCH", "FLOAT", "FOR", "FOREIGN", "FREEZE", "FROM", "FULL", "GRANT", "GREATEST", "GROUP", "GROUPING", "HAVING", "ILIKE", "IN", "INITIALLY",
+ "INNER", "INOUT", "INT", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IS", "ISNULL", "JOIN", "LATERAL", "LEADING", "LEAST", "LEFT", "LIKE", "LIMIT", "LOCALTIME", "LOCALTIMESTAMP",
+ "NATIONAL", "NATURAL", "NCHAR", "NONE", "NORMALIZE", "NOT", "NOTNULL", "NULL", "NULLIF", "NUMERIC", "OFFSET", "ON", "ONLY", "OR", "ORDER", "OUT", "OUTER", "OVERLAPS", "OVERLAY", "PLACING",
+ "POSITION", "PRECISION", "PRIMARY", "REAL", "REFERENCES", "RETURNING", "RIGHT", "ROW", "SELECT", "SESSION_USER", "SETOF", "SIMILAR", "SMALLINT", "SOME", "SUBSTRING", "SYMMETRIC", "TABLE",
+ "TABLESAMPLE", "THEN", "TIME", "TIMESTAMP", "TO", "TRAILING", "TREAT", "TRIM", "TRUE", "UNION", "UNIQUE", "USER", "USING", "VALUES", "VARCHAR", "VARIADIC", "VERBOSE", "WHEN", "WHERE",
+ "WINDOW", "WITH", "XMLATTRIBUTES", "XMLCONCAT", "XMLELEMENT", "XMLEXISTS", "XMLFOREST", "XMLNAMESPACES", "XMLPARSE", "XMLPI", "XMLROOT", "XMLSERIALIZE", "XMLTABLE");
+
+ @Override
+ protected boolean isKeyword(final String item) {
+ return RESERVED_KEYWORDS.contains(item.toUpperCase());
+ }
+
+ @Override
+ protected String getLeftIdentifierQuoteString() {
+ return "\"";
+ }
+
+ @Override
+ protected String getRightIdentifierQuoteString() {
+ return "\"";
+ }
+
+ @Override
+ public String buildInsertSQL(final Record record) {
+ String insertSql = super.buildInsertSQL(record);
+ List<String> uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+ if (uniqueKeyNamesList.isEmpty()) {
+ return insertSql;
+ }
+ StringBuilder updateValue = new StringBuilder();
+ for (String each : record.getAfterMap().keySet()) {
+ if (uniqueKeyNamesList.contains(each)) {
+ continue;
+ }
+ updateValue.append(quote(each)).append("=EXCLUDED.").append(quote(each)).append(",");
+ }
+ updateValue.setLength(updateValue.length() - 1);
+ String uniqueKeyNames = uniqueKeyNamesList.stream().map(this::quote).collect(Collectors.joining(","));
+ return insertSql + String.format(" ON CONFLICT (%s) DO UPDATE SET %s", uniqueKeyNames, updateValue);
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
index 934a2768c3a..6fdb544eb6e 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
@@ -23,7 +23,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
public final class SQLBuilderFactory {
/**
- * Get sql builder.
+ * Get SQL builder.
*
* @param databaseType database type
* @return SQL builder
@@ -32,6 +32,10 @@ public final class SQLBuilderFactory {
switch (databaseType) {
case "openGauss":
return new OpenGaussSQLBuilder();
+ case "MySQL":
+ return new MySQLSQLBuilder();
+ case "PostgreSQL":
+ return new PostgreSQLSQLBuilder();
default:
throw new UnsupportedOperationException(String.format("Not supported %s now", databaseType));
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
index 71f39a31cec..afc52c8c555 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
@@ -26,7 +26,7 @@ import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
-import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BlobValue;
@@ -43,6 +43,7 @@ import java.time.LocalTime;
/**
* Any value convert.
*/
+@Slf4j
public final class AnyValueConvert {
/**
@@ -98,7 +99,9 @@ public final class AnyValueConvert {
if (any.is(BlobValue.class)) {
return any.unpack(BlobValue.class).getValue().toByteArray();
}
- return JsonFormat.printer().includingDefaultValueFields().print(any);
+ // TODO can't use JsonFormat, might change the original value without error prompt. there need to cover more types,
+ log.error("not support unpack value={}", any);
+ throw new UnsupportedOperationException(String.format("not support unpack the type %s", any.getTypeUrl()));
}
private static Timestamp converProtobufTimestamp(final com.google.protobuf.Timestamp timestamp) {
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
index f7c81de30e5..b86b112057e 100644
--- a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.ImportDataSourceParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
@@ -32,7 +33,8 @@ public final class Bootstrap {
* @param args args
*/
public static void main(final String[] args) {
- StartCDCClientParameter parameter = new StartCDCClientParameter();
+ ImportDataSourceParameter importDataSourceParam = new ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified", "gaussdb", "Root@123");
+ StartCDCClientParameter parameter = new StartCDCClientParameter(importDataSourceParam);
parameter.setAddress("127.0.0.1");
parameter.setPort(33071);
parameter.setUsername("root");
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilderTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilderTest.java
new file mode 100644
index 00000000000..f922596f4dc
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/MySQLSQLBuilderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.StringValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class MySQLSQLBuilderTest {
+
+ @Test
+ public void assertBuildInsertSQLWithUniqueKey() {
+ MySQLSQLBuilder sqlBuilder = new MySQLSQLBuilder();
+ TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").addUniqueKeyNames("order_id").setDatabase("cdc_db").build();
+ Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+ String actualSql = sqlBuilder.buildInsertSQL(record);
+ String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?) ON DUPLICATE KEY UPDATE user_id=VALUES(user_id),status=VALUES(status)";
+ assertThat(actualSql, is(expectedSql));
+ }
+
+ private Map<String, Any> buildAfterMap() {
+ Map<String, Any> result = new LinkedHashMap<>();
+ result.put("order_id", Any.pack(Int32Value.of(1)));
+ result.put("user_id", Any.pack(Int32Value.of(2)));
+ result.put("status", Any.pack(StringValue.of("OK")));
+ return result;
+ }
+
+ @Test
+ public void assertBuildInsertSQLWithoutUniqueKey() {
+ MySQLSQLBuilder sqlBuilder = new MySQLSQLBuilder();
+ TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").setDatabase("cdc_db").build();
+ Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+ String actualSql = sqlBuilder.buildInsertSQL(record);
+ String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?)";
+ assertThat(actualSql, is(expectedSql));
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilderTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilderTest.java
new file mode 100644
index 00000000000..90a34bd8de8
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.StringValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class OpenGaussSQLBuilderTest {
+
+ @Test
+ public void assertBuildInsertSQLWithUniqueKey() {
+ OpenGaussSQLBuilder sqlBuilder = new OpenGaussSQLBuilder();
+ TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").addUniqueKeyNames("order_id").setDatabase("cdc_db").build();
+ Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+ String actualSql = sqlBuilder.buildInsertSQL(record);
+ String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?) ON DUPLICATE KEY UPDATE user_id=EXCLUDED.user_id,status=EXCLUDED.status";
+ assertThat(actualSql, is(expectedSql));
+ }
+
+ private Map<String, Any> buildAfterMap() {
+ Map<String, Any> result = new LinkedHashMap<>();
+ result.put("order_id", Any.pack(Int32Value.of(1)));
+ result.put("user_id", Any.pack(Int32Value.of(2)));
+ result.put("status", Any.pack(StringValue.of("OK")));
+ return result;
+ }
+
+ @Test
+ public void assertBuildInsertSQLWithoutUniqueKey() {
+ OpenGaussSQLBuilder sqlBuilder = new OpenGaussSQLBuilder();
+ TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").setDatabase("cdc_db").build();
+ Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+ String actualSql = sqlBuilder.buildInsertSQL(record);
+ String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?)";
+ assertThat(actualSql, is(expectedSql));
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilderTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilderTest.java
new file mode 100644
index 00000000000..1cefb83e7dc
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/PostgreSQLSQLBuilderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.StringValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class PostgreSQLSQLBuilderTest {
+
+ @Test
+ public void assertBuildInsertSQLWithUniqueKey() {
+ PostgreSQLSQLBuilder sqlBuilder = new PostgreSQLSQLBuilder();
+ TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").addUniqueKeyNames("order_id").setDatabase("cdc_db").build();
+ Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+ String actualSql = sqlBuilder.buildInsertSQL(record);
+ String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?) ON CONFLICT (order_id) DO UPDATE SET user_id=EXCLUDED.user_id,status=EXCLUDED.status";
+ assertThat(actualSql, is(expectedSql));
+ }
+
+ private Map<String, Any> buildAfterMap() {
+ Map<String, Any> result = new LinkedHashMap<>();
+ result.put("order_id", Any.pack(Int32Value.of(1)));
+ result.put("user_id", Any.pack(Int32Value.of(2)));
+ result.put("status", Any.pack(StringValue.of("OK")));
+ return result;
+ }
+
+ @Test
+ public void assertBuildInsertSQLWithoutUniqueKey() {
+ PostgreSQLSQLBuilder sqlBuilder = new PostgreSQLSQLBuilder();
+ TableMetaData tableMetaData = TableMetaData.newBuilder().setTableName("t_order").setDatabase("cdc_db").build();
+ Record record = Record.newBuilder().setTableMetaData(tableMetaData).putAllAfter(buildAfterMap()).build();
+ String actualSql = sqlBuilder.buildInsertSQL(record);
+ String expectedSql = "INSERT INTO t_order(order_id,user_id,status) VALUES(?,?,?)";
+ assertThat(actualSql, is(expectedSql));
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties b/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
deleted file mode 100644
index 88f4616e123..00000000000
--- a/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-url=127.0.0.1
-port=5432
-database=cdc_db
-username=gaussdb
-password=Root@123
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 4aa5b30e55a..5708a485a4d 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -74,7 +74,6 @@ import javax.sql.DataSource;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -175,7 +174,7 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
JobDataNodeLine dataNodeLine = jobConfig.getJobShardingDataNodes().get(jobShardingItem);
Map<ActualTableName, LogicTableName> tableNameMap = new LinkedHashMap<>();
dataNodeLine.getEntries().forEach(each -> each.getDataNodes().forEach(node -> tableNameMap.put(new ActualTableName(node.getTableName()), new LogicTableName(each.getLogicTableName()))));
- TableNameSchemaNameMapping tableNameSchemaNameMapping = new TableNameSchemaNameMapping(Collections.emptyMap());
+ TableNameSchemaNameMapping tableNameSchemaNameMapping = getTableNameSchemaNameMapping(jobConfig.getTableNames());
String dataSourceName = dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfiguration = jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig, dataSourceName, actualDataSourceConfiguration, tableNameMap, tableNameSchemaNameMapping);
@@ -185,6 +184,17 @@ public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
return result;
}
+ private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final List<String> tableNames) {
+ Map<LogicTableName, String> tableNameSchemaMap = new LinkedHashMap<>();
+ for (String each : tableNames) {
+ String[] split = each.split("\\.");
+ if (split.length > 1) {
+ tableNameSchemaMap.put(new LogicTableName(split[1]), split[0]);
+ }
+ }
+ return new TableNameSchemaNameMapping(tableNameSchemaMap);
+ }
+
private static DumperConfiguration buildDumperConfiguration(final CDCJobConfiguration jobConfig, final String dataSourceName, final PipelineDataSourceConfiguration sourceDataSourceConfig,
final Map<ActualTableName, LogicTableName> tableNameMap, final TableNameSchemaNameMapping tableNameSchemaNameMapping) {
DumperConfiguration result = new DumperConfiguration();
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index 881629588ba..d1b3e8e90bc 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -90,7 +90,7 @@ public final class CDCImporterConnector implements ImporterConnector {
this.jobShardingCount = jobShardingCount;
tableNames.stream().filter(each -> each.contains(".")).forEach(each -> {
String[] split = each.split("\\.");
- tableNameSchemaMap.put(split[0], split[1]);
+ tableNameSchemaMap.put(split[1], split[0]);
});
this.dataRecordComparator = dataRecordComparator;
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
index c62c9327748..b84cf56906b 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.cdc.util;
-import com.google.gson.Gson;
import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
@@ -25,12 +24,8 @@ import com.google.protobuf.DoubleValue;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
-import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
import com.google.protobuf.StringValue;
-import com.google.protobuf.Struct;
-import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
@@ -58,8 +53,6 @@ import java.util.Date;
@Slf4j
public final class ColumnValueConvertUtil {
- private static final Gson GSON = new Gson();
-
/**
* Convert java object to protobuf message.
*
@@ -71,37 +64,37 @@ public final class ColumnValueConvertUtil {
return NullValue.newBuilder().build();
}
if (object instanceof Integer) {
- return Int32Value.newBuilder().setValue((int) object).build();
+ return Int32Value.of((int) object);
}
if (object instanceof Short) {
- return Int32Value.newBuilder().setValue(((Short) object).intValue()).build();
+ return Int32Value.of(((Short) object).intValue());
}
if (object instanceof Byte) {
- return Int32Value.newBuilder().setValue(((Byte) object).intValue()).build();
+ return Int32Value.of(((Byte) object).intValue());
}
if (object instanceof Long) {
- return Int64Value.newBuilder().setValue((long) object).build();
+ return Int64Value.of((long) object);
}
if (object instanceof BigInteger) {
return BigIntegerValue.newBuilder().setValue(ByteString.copyFrom(((BigInteger) object).toByteArray())).build();
}
if (object instanceof Float) {
- return FloatValue.newBuilder().setValue((float) object).build();
+ return FloatValue.of((float) object);
}
if (object instanceof Double) {
- return DoubleValue.newBuilder().setValue((double) object).build();
+ return DoubleValue.of((double) object);
}
if (object instanceof BigDecimal) {
return BigDecimalValue.newBuilder().setValue(object.toString()).build();
}
if (object instanceof String) {
- return StringValue.newBuilder().setValue(object.toString()).build();
+ return StringValue.of(object.toString());
}
if (object instanceof Boolean) {
- return BoolValue.newBuilder().setValue((boolean) object).build();
+ return BoolValue.of((boolean) object);
}
if (object instanceof byte[]) {
- return BytesValue.newBuilder().setValue(ByteString.copyFrom((byte[]) object)).build();
+ return BytesValue.of(ByteString.copyFrom((byte[]) object));
}
if (object instanceof Date) {
return converToProtobufTimestamp((Date) object);
@@ -141,21 +134,11 @@ public final class ColumnValueConvertUtil {
throw new RuntimeException(ex);
}
}
- return fromJson(GSON.toJson(object));
+ return StringValue.newBuilder().setValue(object.toString()).build();
}
private static com.google.protobuf.Timestamp converToProtobufTimestamp(final Date timestamp) {
long millis = timestamp.getTime();
return com.google.protobuf.Timestamp.newBuilder().setSeconds(millis / 1000).setNanos((int) ((millis % 1000) * 1000000)).build();
}
-
- private static Message fromJson(final String json) {
- Builder structBuilder = Struct.newBuilder();
- try {
- JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder);
- } catch (final InvalidProtocolBufferException ex) {
- throw new RuntimeException(ex);
- }
- return structBuilder.build();
- }
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 29c835b8b31..b2a3a15a4f3 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -156,7 +156,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
boolean updated = !Objects.equals(newValue, oldValue);
PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(j + 1);
record.addColumn(new Column(columnMetaData.getName(),
- (columnMetaData.isPrimaryKey() && updated) ? handleValue(columnMetaData, oldValue) : null,
+ handleValue(columnMetaData, oldValue),
handleValue(columnMetaData, newValue), updated, columnMetaData.isPrimaryKey()));
}
channel.pushRecord(record);
diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 7e9fa85d54d..1137e669816 100644
--- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -87,8 +87,9 @@ public final class CDCBackendHandler {
return CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
}
Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
- for (String each : tableNames) {
- actualDataNodesMap.put(each, getActualDataNodes(shardingRule.get(), each));
+ // TODO need support case-insensitive later
+ for (TableName each : createSubscription.getTableNamesList()) {
+ actualDataNodesMap.put(each.getName(), getActualDataNodes(shardingRule.get(), each.getName()));
}
CreateSubscriptionJobParameter parameter = new CreateSubscriptionJobParameter(createSubscription.getDatabase(), tableNames, createSubscription.getSubscriptionName(),
createSubscription.getSubscriptionMode().name(), actualDataNodesMap, createSubscription.getIncrementalGlobalOrderly());
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
index 99e43b5cf4a..b42a3e7bb59 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/base/PipelineBaseE2EIT.java
@@ -150,6 +150,7 @@ public abstract class PipelineBaseE2EIT {
}
try {
connection.createStatement().execute(String.format("DROP DATABASE %s", PROXY_DATABASE));
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
} catch (final SQLException ex) {
log.warn("Drop proxy database failed, maybe it's not exist. error msg={}", ex.getMessage());
}
@@ -189,7 +190,10 @@ public abstract class PipelineBaseE2EIT {
}
private void createProxyDatabase(final Connection connection) throws SQLException {
- connection.createStatement().execute(String.format("CREATE DATABASE %s", PROXY_DATABASE));
+ String sql = String.format("CREATE DATABASE %s", PROXY_DATABASE);
+ log.info("create proxy database {}", PROXY_DATABASE);
+ connection.createStatement().execute(sql);
+ ThreadUtil.sleep(2, TimeUnit.SECONDS);
}
protected void addResource(final String distSQL) throws SQLException {
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 4c0cf2740c1..d668060cba5 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -117,13 +117,6 @@ public abstract class AbstractMigrationE2EIT extends PipelineBaseE2EIT {
}
protected void addMigrationProcessConfig() throws SQLException {
- if (PipelineEnvTypeEnum.NATIVE == ENV.getItEnvType()) {
- try {
- proxyExecuteWithLog("DROP MIGRATION PROCESS CONFIGURATION '/'", 0);
- } catch (final SQLException ex) {
- log.warn("Drop migration process configuration failed, maybe it's not exist. error msg={}", ex.getMessage());
- }
- }
proxyExecuteWithLog(migrationDistSQLCommand.getAlterMigrationRule(), 0);
}