You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/01/04 07:22:55 UTC
[shardingsphere] branch master updated: CDC client add openGauss importer implementation (#23320)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a9d6c23064b CDC client add openGauss importer implementation (#23320)
a9d6c23064b is described below
commit a9d6c23064b96aae161f726353892335db7098e0
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Wed Jan 4 15:22:48 2023 +0800
CDC client add openGauss importer implementation (#23320)
* CDC client add openGauss importer
* Fix codestyle
* Fix codestyle
---
kernel/data-pipeline/cdc/client/pom.xml | 4 +-
.../data/pipeline/cdc/client/CDCClient.java | 3 +-
.../cdc/client/handler/LoginRequestHandler.java | 1 +
.../client/handler/SubscriptionRequestHandler.java | 42 +++---
.../Importer.java} | 48 +++----
.../cdc/client/importer/ImporterFactory.java | 40 ++++++
.../cdc/client/importer/OpenGaussImporter.java | 126 ++++++++++++++++++
.../client/parameter/StartCDCClientParameter.java | 2 +
.../cdc/client/sqlbuilder/AbstractSQLBuilder.java | 147 +++++++++++++++++++++
.../cdc/client/sqlbuilder/OpenGaussSQLBuilder.java | 66 +++++++++
.../SQLBuilder.java} | 57 ++++----
.../cdc/client/sqlbuilder/SQLBuilderFactory.java | 39 ++++++
.../pipeline/cdc/client/util/AnyValueConvert.java | 107 +++++++++++++++
.../cdc/client/example/opengauss/Bootstrap.java | 49 +++++++
.../src/test/resources/env/opengauss.properties | 22 +++
.../pipeline/cdc/core/importer/CDCImporter.java | 6 +-
.../importer/connector/CDCImporterConnector.java | 18 ++-
.../data/pipeline/cdc/core/job/CDCJob.java | 4 +-
.../pipeline/cdc/core/task/CDCTasksRunner.java | 40 ++++++
.../cdc/util/DataRecordResultConvertUtil.java | 15 ++-
.../src/main/proto/CDCResponseProtocol.proto | 1 +
.../core/task/InventoryIncrementalTasksRunner.java | 20 +--
22 files changed, 757 insertions(+), 100 deletions(-)
diff --git a/kernel/data-pipeline/cdc/client/pom.xml b/kernel/data-pipeline/cdc/client/pom.xml
index 6433c17c06f..13da97f129b 100644
--- a/kernel/data-pipeline/cdc/client/pom.xml
+++ b/kernel/data-pipeline/cdc/client/pom.xml
@@ -44,8 +44,8 @@
<version>${netty.version}</version>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>org.opengauss</groupId>
+ <artifactId>opengauss-jdbc</artifactId>
</dependency>
</dependencies>
</project>
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 4f2acf1fd90..29e8dc33f38 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -89,8 +89,7 @@ public final class CDCClient {
channel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
channel.pipeline().addLast(new ProtobufEncoder());
channel.pipeline().addLast(new LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
- channel.pipeline().addLast(new SubscriptionRequestHandler(parameter.getDatabase(), parameter.getSubscriptionName(), parameter.getSubscribeTables(),
- parameter.getSubscriptionMode(), parameter.isIncrementalGlobalOrderly()));
+ channel.pipeline().addLast(new SubscriptionRequestHandler(parameter));
}
});
ChannelFuture future = bootstrap.connect(address, port).sync();
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
index 2372f37fc86..f844cb13393 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
@@ -56,6 +56,7 @@ public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(null);
+ ctx.fireChannelInactive();
}
@Override
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index 17855103394..8ac5fbe063a 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -19,18 +19,18 @@ package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
+import org.apache.shardingsphere.data.pipeline.cdc.client.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.cdc.client.importer.ImporterFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
@@ -43,24 +43,22 @@ import java.util.List;
* Subscription request handler.
*/
@Slf4j
-@RequiredArgsConstructor
public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapter {
- private final String database;
+ private final StartCDCClientParameter parameter;
- private final String subscriptionName;
+ private final Importer importer;
- private final List<TableName> subscribeTables;
-
- private final SubscriptionMode subscribeMode;
-
- private final boolean incrementalGlobalOrderly;
+ public SubscriptionRequestHandler(final StartCDCClientParameter parameter) {
+ this.parameter = parameter;
+ importer = ImporterFactory.getImporter(parameter.getDatabaseType());
+ }
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
if (evt instanceof CreateSubscriptionEvent) {
- CreateSubscriptionRequest createSubscriptionRequest = CreateSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionMode(subscribeMode).setSubscriptionName(subscriptionName)
- .addAllTableNames(subscribeTables).setIncrementalGlobalOrderly(incrementalGlobalOrderly).build();
+ CreateSubscriptionRequest createSubscriptionRequest = CreateSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionMode(parameter.getSubscriptionMode())
+ .setSubscriptionName(parameter.getSubscriptionName()).addAllTableNames(parameter.getSubscribeTables()).setIncrementalGlobalOrderly(parameter.isIncrementalGlobalOrderly()).build();
CDCRequest request = CDCRequest.newBuilder().setCreateSubscription(createSubscriptionRequest).setRequestId(RequestIdUtil.generateRequestId()).build();
ctx.writeAndFlush(request);
}
@@ -88,7 +86,7 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
private void sendCreateSubscriptionRequest(final ChannelHandlerContext ctx, final CDCResponse response, final ClientConnectionContext connectionContext) {
log.info("create subscription succeed, subscription name {}, exist {}", response.getCreateSubscriptionResult().getSubscriptionName(), response.getCreateSubscriptionResult().getExisting());
- StartSubscriptionRequest startSubscriptionRequest = StartSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionName(subscriptionName).build();
+ StartSubscriptionRequest startSubscriptionRequest = StartSubscriptionRequest.newBuilder().setDatabase(parameter.getDatabase()).setSubscriptionName(parameter.getSubscriptionName()).build();
Builder builder = CDCRequest.newBuilder().setRequestId(RequestIdUtil.generateRequestId()).setStartSubscription(startSubscriptionRequest);
ctx.writeAndFlush(builder.build());
connectionContext.setStatus(ClientConnectionStatus.CREATING_SUBSCRIPTION);
@@ -101,11 +99,25 @@ public final class SubscriptionRequestHandler extends ChannelInboundHandlerAdapt
private void subscribeDataRecords(final ChannelHandlerContext ctx, final DataRecordResult result) {
List<Record> recordsList = result.getRecordsList();
- log.debug("received records {}", recordsList);
+ for (Record each : recordsList) {
+ try {
+ importer.write(each);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("write data failed", ex);
+ }
+ }
// TODO data needs to be processed, such as writing to a database
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setAckRequest(AckRequest.newBuilder().setAckId(result.getAckId()).build()).build());
}
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ importer.close();
+ ctx.fireChannelInactive();
+ }
+
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
log.error("subscription handler error", cause);
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/Importer.java
similarity index 50%
copy from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
copy to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/Importer.java
index 90d0ab31c6e..281860264f8 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/Importer.java
@@ -15,37 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
/**
- * Start CDC client parameter.
+ * Importer.
*/
-@Getter
-@Setter
-public final class StartCDCClientParameter {
-
- private String address;
-
- private int port;
-
- private String username;
-
- private String password;
-
- private String database;
-
- private List<TableName> subscribeTables;
-
- private String subscriptionName;
-
- private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
-
- private boolean incrementalGlobalOrderly;
+public interface Importer {
+
+ /**
+ * Write record.
+ *
+ * @param record record
+ * @throws Exception exception
+ */
+ void write(Record record) throws Exception;
+
+ /**
+ * Close importer.
+ *
+ * @throws Exception exception
+ */
+ void close() throws Exception;
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
new file mode 100644
index 00000000000..68a356d3fd2
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+
+/**
+ * Importer factory.
+ */
+public final class ImporterFactory {
+
+ /**
+ * Get importer.
+ *
+ * @param databaseType database type
+ * @return importer
+ */
+ // TODO use SPI
+ public static Importer getImporter(final String databaseType) {
+ switch (databaseType) {
+ case "openGauss":
+ return new OpenGaussImporter();
+ default:
+ return null;
+ }
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
new file mode 100644
index 00000000000..7ff1c61f183
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ProtocolStringList;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilder;
+import org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.AnyValueConvert;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * OpenGauss importer.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class OpenGaussImporter implements Importer {
+
+ private final SQLBuilder sqlBuilder = SQLBuilderFactory.getSQLBuilder("openGauss");
+
+ private final Connection connection;
+
+ public OpenGaussImporter() {
+ Properties properties = new Properties();
+ try (InputStream inputStream = OpenGaussImporter.class.getClassLoader().getResourceAsStream("env/opengauss.properties")) {
+ properties.load(inputStream);
+ String url = properties.getProperty("url");
+ String port = properties.getProperty("port");
+ String database = properties.getProperty("database");
+ String username = properties.getProperty("username");
+ String password = properties.getProperty("password");
+ connection = DriverManager.getConnection(String.format("jdbc:opengauss://%s:%s/%s", url, port, database), username, password);
+ } catch (final IOException | SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void write(final Record record) throws SQLException, InvalidProtocolBufferException {
+ Optional<String> sqlOptional = buildSQL(record);
+ if (!sqlOptional.isPresent()) {
+ log.error("build sql failed, record {}", record);
+ throw new RuntimeException("build sql failed");
+ }
+ String sql = sqlOptional.get();
+ try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ List<Any> afterValue = new ArrayList<>(record.getAfterMap().values());
+ ProtocolStringList uniqueKeyNamesList = record.getTableMetaData().getUniqueKeyNamesList();
+ List<String> conditionColumnNames = record.getBeforeMap().keySet().containsAll(uniqueKeyNamesList) ? uniqueKeyNamesList : new ArrayList<>(record.getBeforeMap().keySet());
+ switch (record.getDataChangeType()) {
+ case INSERT:
+ for (int i = 0; i < afterValue.size(); i++) {
+ preparedStatement.setObject(i + 1, AnyValueConvert.convertToObject(afterValue.get(i)));
+ }
+ break;
+ case UPDATE:
+ for (int i = 0; i < afterValue.size(); i++) {
+ preparedStatement.setObject(i + 1, AnyValueConvert.convertToObject(afterValue.get(i)));
+ }
+ for (int i = 0; i < conditionColumnNames.size(); i++) {
+ preparedStatement.setObject(afterValue.size() + i + 1, AnyValueConvert.convertToObject(record.getBeforeMap().get(conditionColumnNames.get(i))));
+ }
+ int updateCount = preparedStatement.executeUpdate();
+ if (1 != updateCount) {
+ log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}, conditionColumns={}", updateCount, sql, record.getAfterMap().keySet(), conditionColumnNames);
+ }
+ break;
+ case DELETE:
+ for (int i = 0; i < conditionColumnNames.size(); i++) {
+ preparedStatement.setObject(i + 1, AnyValueConvert.convertToObject(record.getAfterMap().get(conditionColumnNames.get(i))));
+ }
+ preparedStatement.execute();
+ break;
+ default:
+ }
+ preparedStatement.execute();
+ }
+ }
+
+ private Optional<String> buildSQL(final Record record) {
+ switch (record.getDataChangeType()) {
+ case INSERT:
+ return Optional.ofNullable(sqlBuilder.buildInsertSQL(record));
+ case UPDATE:
+ return Optional.ofNullable(sqlBuilder.buildUpdateSQL(record));
+ case DELETE:
+ return Optional.ofNullable(sqlBuilder.buildDeleteSQL(record));
+ default:
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public void close() throws SQLException {
+ connection.close();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index 90d0ab31c6e..0377c1f9f55 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -31,6 +31,8 @@ import java.util.List;
@Setter
public final class StartCDCClientParameter {
+ private String databaseType;
+
private String address;
private int port;
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/AbstractSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/AbstractSQLBuilder.java
new file mode 100644
index 00000000000..467097b8838
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/AbstractSQLBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Abstract SQL builder.
+ */
+public abstract class AbstractSQLBuilder implements SQLBuilder {
+
+ protected static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
+
+ protected static final String UPDATE_SQL_CACHE_KEY_PREFIX = "UPDATE_";
+
+ protected static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
+
+ @Getter
+ private final ConcurrentMap<String, String> sqlCacheMap = new ConcurrentHashMap<>();
+
+ /**
+ * Add left and right identifier quote string.
+ *
+ * @param item to add quote item
+ * @return add quote string
+ */
+ public String quote(final String item) {
+ return isKeyword(item) ? getLeftIdentifierQuoteString() + item + getRightIdentifierQuoteString() : item;
+ }
+
+ protected abstract boolean isKeyword(String item);
+
+ /**
+ * Get left identifier quote string.
+ *
+ * @return string
+ */
+ protected abstract String getLeftIdentifierQuoteString();
+
+ /**
+ * Get right identifier quote string.
+ *
+ * @return string
+ */
+ protected abstract String getRightIdentifierQuoteString();
+
+ protected final String getQualifiedTableName(final String schemaName, final String tableName) {
+ StringBuilder result = new StringBuilder();
+ if (!Strings.isNullOrEmpty(schemaName)) {
+ result.append(quote(schemaName)).append(".");
+ }
+ result.append(quote(tableName));
+ return result.toString();
+ }
+
+ @Override
+ public String buildInsertSQL(final Record record) {
+ String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX + record.getTableMetaData().getTableName();
+ if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+ sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(record));
+ }
+ return sqlCacheMap.get(sqlCacheKey);
+ }
+
+ private String buildInsertSQLInternal(final Record record) {
+ StringBuilder columnsLiteral = new StringBuilder();
+ StringBuilder holder = new StringBuilder();
+ for (String each : record.getAfterMap().keySet()) {
+ columnsLiteral.append(String.format("%s,", quote(each)));
+ holder.append("?,");
+ }
+ columnsLiteral.setLength(columnsLiteral.length() - 1);
+ holder.setLength(holder.length() - 1);
+ TableMetaData tableMetaData = record.getTableMetaData();
+ return String.format("INSERT INTO %s(%s) VALUES(%s)", getQualifiedTableName(tableMetaData.getSchema(), tableMetaData.getTableName()), columnsLiteral, holder);
+ }
+
+ @Override
+ public String buildUpdateSQL(final Record record) {
+ TableMetaData tableMetaData = record.getTableMetaData();
+ String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX + tableMetaData.getTableName();
+ if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+ sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(tableMetaData.getSchema(), tableMetaData.getTableName(), record.getBeforeMap().keySet(), tableMetaData.getUniqueKeyNamesList()));
+ }
+ StringBuilder updatedColumnString = new StringBuilder();
+ for (String each : record.getAfterMap().keySet()) {
+ updatedColumnString.append(String.format("%s = ?,", quote(each)));
+ }
+ updatedColumnString.setLength(updatedColumnString.length() - 1);
+ return String.format(sqlCacheMap.get(sqlCacheKey), updatedColumnString);
+ }
+
+ private String buildUpdateSQLInternal(final String schemaName, final String tableName, final Collection<String> columnNames, final Collection<String> uniqueKeyNames) {
+ return String.format("UPDATE %s SET %%s WHERE %s", getQualifiedTableName(schemaName, tableName), buildWhereSQL(columnNames, uniqueKeyNames));
+ }
+
+ private String buildWhereSQL(final Collection<String> columnNames, final Collection<String> uniqueKeyNames) {
+ StringBuilder where = new StringBuilder();
+ for (String each : columnNames.containsAll(uniqueKeyNames) ? uniqueKeyNames : columnNames) {
+ where.append(String.format("%s = ? and ", quote(each)));
+ }
+ where.setLength(where.length() - 5);
+ return where.toString();
+ }
+
+ /**
+ * Build delete SQL.
+ *
+ * @param record record
+ * @return delete SQL
+ */
+ @Override
+ public String buildDeleteSQL(final Record record) {
+ TableMetaData tableMetaData = record.getTableMetaData();
+ String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX + tableMetaData.getTableName();
+ if (!sqlCacheMap.containsKey(sqlCacheKey)) {
+ sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(tableMetaData.getSchema(), tableMetaData.getTableName(), record.getBeforeMap().keySet(), tableMetaData.getUniqueKeyNamesList()));
+ }
+ return sqlCacheMap.get(sqlCacheKey);
+ }
+
+ private String buildDeleteSQLInternal(final String schemaName, final String tableName, final Collection<String> columnNames, final Collection<String> uniqueKeyNames) {
+ return String.format("DELETE FROM %s WHERE %s", getQualifiedTableName(schemaName, tableName), buildWhereSQL(columnNames, uniqueKeyNames));
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
new file mode 100644
index 00000000000..bb0132b27a0
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/OpenGaussSQLBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Pipeline SQL builder of openGauss.
+ */
+public final class OpenGaussSQLBuilder extends AbstractSQLBuilder {
+
+ private static final List<String> RESERVED_KEYWORDS = Arrays.asList("ALL", "ANALYSE", "ANALYZE", "AND", "ANY", "ARRAY", "AS", "ASC", "ASYMMETRIC", "AUTHID", "AUTHORIZATION", "BETWEEN", "BIGINT",
+ "BINARY", "BINARY_DOUBLE", "BINARY_INTEGER", "BIT", "BOOLEAN", "BOTH", "BUCKETCNT", "BUCKETS", "BYTEAWITHOUTORDER", "BYTEAWITHOUTORDERWITHEQUAL", "CASE", "CAST", "CHAR", "CHARACTER",
+ "CHECK", "COALESCE", "COLLATE", "COLLATION", "COLUMN", "COMPACT", "CONCURRENTLY", "CONSTRAINT", "CREATE", "CROSS", "CSN", "CURRENT_CATALOG", "CURRENT_DATE", "CURRENT_ROLE",
+ "CURRENT_SCHEMA", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "DATE", "DEC", "DECIMAL", "DECODE", "DEFAULT", "DEFERRABLE", "DELTAMERGE", "DESC", "DISTINCT", "DO", "ELSE", "END",
+ "EXCEPT", "EXCLUDED", "EXISTS", "EXTRACT", "FALSE", "FENCED", "FETCH", "FLOAT", "FOR", "FOREIGN", "FREEZE", "FROM", "FULL", "GRANT", "GREATEST", "GROUP", "GROUPING", "GROUPPARENT",
+ "HAVING", "HDFSDIRECTORY", "ILIKE", "IN", "INITIALLY", "INNER", "INOUT", "INT", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IS", "JOIN", "LEADING", "LEAST", "LEFT", "LESS", "LIKE",
+ "LIMIT", "LOCALTIME", "LOCALTIMESTAMP", "MAXVALUE", "MINUS", "MODIFY", "NATIONAL", "NATURAL", "NCHAR", "NOCYCLE", "NONE", "NOT", "NOTNULL", "NULL", "NULLIF", "NUMBER", "NUMERIC",
+ "NVARCHAR", "NVARCHAR2", "NVL", "OFFSET", "ON", "ONLY", "OR", "ORDER", "OUT", "OUTER", "OVERLAPS", "OVERLAY", "PERFORMANCE", "PLACING", "POSITION", "PRECISION", "PRIMARY", "PRIORER",
+ "PROCEDURE", "REAL", "RECYCLEBIN", "REFERENCES", "REJECT", "RETURNING", "RIGHT", "ROW", "ROWNUM", "SELECT", "SESSION_USER", "SETOF", "SIMILAR", "SMALLDATETIME", "SMALLINT", "SOME",
+ "SUBSTRING", "SYMMETRIC", "SYSDATE", "TABLE", "TABLESAMPLE", "THEN", "TIME", "TIMECAPSULE", "TIMESTAMP", "TIMESTAMPDIFF", "TINYINT", "TO", "TRAILING", "TREAT", "TRIM", "TRUE", "UNION",
+ "UNIQUE", "USER", "USING", "VALUES", "VARCHAR", "VARCHAR2", "VARIADIC", "VERBOSE", "VERIFY", "WHEN", "WHERE", "WINDOW", "WITH", "XMLATTRIBUTES", "XMLCONCAT", "XMLELEMENT", "XMLEXISTS",
+ "XMLFOREST", "XMLPARSE", "XMLPI", "XMLROOT", "XMLSERIALIZE");
+
+ @Override
+ protected boolean isKeyword(final String item) {
+ return RESERVED_KEYWORDS.contains(item.toUpperCase());
+ }
+
+ @Override
+ protected String getLeftIdentifierQuoteString() {
+ return "\"";
+ }
+
+ @Override
+ protected String getRightIdentifierQuoteString() {
+ return "\"";
+ }
+
+ @Override
+ public String buildInsertSQL(final Record record) {
+ String insertSql = super.buildInsertSQL(record);
+ if (!record.getTableMetaData().getUniqueKeyNamesList().isEmpty()) {
+ return insertSql + " ON DUPLICATE KEY UPDATE NOTHING";
+ }
+ return insertSql;
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilder.java
similarity index 50%
copy from kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
copy to kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilder.java
index 90d0ab31c6e..33d698a33fa 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilder.java
@@ -15,37 +15,36 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
-
-import java.util.List;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
/**
- * Start CDC client parameter.
+ * SQL builder.
*/
-@Getter
-@Setter
-public final class StartCDCClientParameter {
-
- private String address;
-
- private int port;
-
- private String username;
-
- private String password;
-
- private String database;
-
- private List<TableName> subscribeTables;
-
- private String subscriptionName;
-
- private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
-
- private boolean incrementalGlobalOrderly;
+public interface SQLBuilder {
+
+ /**
+ * Build insert SQL.
+ *
+ * @param record data record
+ * @return insert SQL
+ */
+ String buildInsertSQL(Record record);
+
+ /**
+ * Build update SQL.
+ *
+ * @param record record
+ * @return update SQL
+ */
+ String buildUpdateSQL(Record record);
+
+ /**
+ * Build delete SQL.
+ *
+ * @param record record
+ * @return update SQL
+ */
+ String buildDeleteSQL(Record record);
}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
new file mode 100644
index 00000000000..934a2768c3a
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+/**
+ * SQL builder factory.
+ */
+public final class SQLBuilderFactory {
+
+ /**
+ * Get sql builder.
+ *
+ * @param databaseType database type
+ * @return SQL builder
+ */
+ public static SQLBuilder getSQLBuilder(final String databaseType) {
+ switch (databaseType) {
+ case "openGauss":
+ return new OpenGaussSQLBuilder();
+ default:
+ throw new UnsupportedOperationException(String.format("Not supported %s now", databaseType));
+ }
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
new file mode 100644
index 00000000000..71f39a31cec
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/AnyValueConvert.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.util.JsonFormat;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BlobValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ClobValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.LocalTimeValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.NullValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalTime;
+
+/**
+ * Any value convert.
+ */
+public final class AnyValueConvert {
+
+ /**
+ * Convert any to object.
+ *
+ * @param any any
+ * @return object
+ * @throws InvalidProtocolBufferException invalid protocol buffer exception
+ */
+ public static Object convertToObject(final Any any) throws InvalidProtocolBufferException {
+ if (null == any || any.is(NullValue.class)) {
+ return null;
+ }
+ if (any.is(StringValue.class)) {
+ return any.unpack(StringValue.class).getValue();
+ }
+ if (any.is(Int32Value.class)) {
+ return any.unpack(Int32Value.class).getValue();
+ }
+ if (any.is(Int64Value.class)) {
+ return any.unpack(Int64Value.class).getValue();
+ }
+ if (any.is(Int64Value.class)) {
+ return any.unpack(Int64Value.class).getValue();
+ }
+ if (any.is(BigIntegerValue.class)) {
+ return new BigInteger(any.unpack(BigIntegerValue.class).getValue().toByteArray());
+ }
+ if (any.is(FloatValue.class)) {
+ return any.unpack(FloatValue.class).getValue();
+ }
+ if (any.is(DoubleValue.class)) {
+ return any.unpack(DoubleValue.class).getValue();
+ }
+ if (any.is(BigDecimalValue.class)) {
+ return new BigDecimal(any.unpack(BigDecimalValue.class).getValue());
+ }
+ if (any.is(BoolValue.class)) {
+ return any.unpack(BoolValue.class).getValue();
+ }
+ if (any.is(BytesValue.class)) {
+ return any.unpack(BytesValue.class).getValue().toByteArray();
+ }
+ if (any.is(com.google.protobuf.Timestamp.class)) {
+ return converProtobufTimestamp(any.unpack(com.google.protobuf.Timestamp.class));
+ }
+ if (any.is(LocalTimeValue.class)) {
+ return LocalTime.parse(any.unpack(LocalTimeValue.class).getValue());
+ }
+ if (any.is(ClobValue.class)) {
+ return any.unpack(ClobValue.class).getValue();
+ }
+ if (any.is(BlobValue.class)) {
+ return any.unpack(BlobValue.class).getValue().toByteArray();
+ }
+ return JsonFormat.printer().includingDefaultValueFields().print(any);
+ }
+
+ private static Timestamp converProtobufTimestamp(final com.google.protobuf.Timestamp timestamp) {
+ return new Timestamp(Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()).toEpochMilli());
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
new file mode 100644
index 00000000000..f7c81de30e5
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
+
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+
+import java.util.Collections;
+
+public final class Bootstrap {
+
+ /**
+ * Main entrance.
+ *
+ * @param args args
+ */
+ public static void main(final String[] args) {
+ StartCDCClientParameter parameter = new StartCDCClientParameter();
+ parameter.setAddress("127.0.0.1");
+ parameter.setPort(33071);
+ parameter.setUsername("root");
+ parameter.setPassword("root");
+ parameter.setDatabase("sharding_db");
+ parameter.setSubscriptionMode(SubscriptionMode.FULL);
+ parameter.setSubscriptionName("subscribe_sharding_db");
+ parameter.setIncrementalGlobalOrderly(true);
+ parameter.setSubscribeTables(Collections.singletonList(TableName.newBuilder().setName("t_order").build()));
+ parameter.setDatabaseType("openGauss");
+ CDCClient cdcClient = new CDCClient(parameter);
+ cdcClient.start();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties b/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
new file mode 100644
index 00000000000..88f4616e123
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/resources/env/opengauss.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+url=127.0.0.1
+port=5432
+database=cdc_db
+username=gaussdb
+password=Root@123
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 5b7f7c08272..9b066b6dee8 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -117,7 +117,9 @@ public final class CDCImporter extends AbstractLifecycleExecutor implements Impo
@Override
protected void doStop() {
- importerConnector.clean();
- CDCAckHolder.getInstance().cleanUp(this);
+ if (ImporterType.INCREMENTAL == importerType) {
+ importerConnector.clean(this);
+ CDCAckHolder.getInstance().cleanUp(this);
+ }
}
}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index 257f2324d71..881629588ba 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -65,7 +65,7 @@ public final class CDCImporterConnector implements ImporterConnector {
private final Condition condition = lock.newCondition();
@Setter
- private volatile boolean running = true;
+ private volatile boolean incrementalTaskRunning = true;
@Getter
private final String database;
@@ -127,10 +127,10 @@ public final class CDCImporterConnector implements ImporterConnector {
}
private void writeImmediately(final List<? extends Record> recordList, final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
- while (!channel.isWritable() && channel.isActive() && running) {
+ while (!channel.isWritable() && channel.isActive()) {
doAwait();
}
- if (!channel.isActive() || !running) {
+ if (!channel.isActive()) {
return;
}
List<DataRecordResult.Record> records = new LinkedList<>();
@@ -189,10 +189,14 @@ public final class CDCImporterConnector implements ImporterConnector {
/**
* Clean CDC importer connector.
+ *
+ * @param cdcImporter CDC importer
*/
- public void clean() {
- running = false;
- incrementalRecordMap.clear();
+ public void clean(final CDCImporter cdcImporter) {
+ incrementalRecordMap.remove(cdcImporter);
+ if (ImporterType.INCREMENTAL == cdcImporter.getImporterType()) {
+ incrementalTaskRunning = false;
+ }
}
@Override
@@ -207,7 +211,7 @@ public final class CDCImporterConnector implements ImporterConnector {
@Override
public void run() {
- while (running) {
+ while (incrementalTaskRunning) {
Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new HashMap<>();
List<DataRecord> dataRecords = new LinkedList<>();
for (int i = 0; i < batchSize; i++) {
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index f3272de2ceb..6479e96d100 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -29,11 +29,11 @@ import org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfigurat
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.job.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
+import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -71,7 +71,7 @@ public final class CDCJob extends AbstractSimplePipelineJob {
protected PipelineTasksRunner buildPipelineTasksRunner(final PipelineJobItemContext pipelineJobItemContext) {
InventoryIncrementalJobItemContext jobItemContext = (InventoryIncrementalJobItemContext) pipelineJobItemContext;
- return new InventoryIncrementalTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
+ return new CDCTasksRunner(jobItemContext, jobItemContext.getInventoryTasks(), jobItemContext.getIncrementalTasks());
}
@Override
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
new file mode 100644
index 00000000000..1497458c92d
--- /dev/null
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCTasksRunner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.task;
+
+import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+
+/**
+ * CDC tasks runner.
+ */
+public final class CDCTasksRunner extends InventoryIncrementalTasksRunner {
+
+ public CDCTasksRunner(final PipelineJobItemContext jobItemContext, final Collection<InventoryTask> inventoryTasks, final Collection<IncrementalTask> incrementalTasks) {
+ super(jobItemContext, inventoryTasks, incrementalTasks);
+ }
+
+ @Override
+ protected void inventorySuccessCallback() {
+ executeIncrementalTask();
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
index 9108ed6af2e..4e85c3b2e14 100644
--- a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
+++ b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
@@ -27,7 +27,9 @@ import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordR
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
import org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
/**
@@ -44,13 +46,18 @@ public final class DataRecordResultConvertUtil {
* @return record
*/
public static Record convertDataRecordToRecord(final String database, final String schema, final DataRecord dataRecord) {
- Map<String, Any> beforeMap = new HashMap<>();
- Map<String, Any> afterMap = new HashMap<>();
+ Map<String, Any> beforeMap = new LinkedHashMap<>();
+ Map<String, Any> afterMap = new LinkedHashMap<>();
+ List<String> uniqueKeyNames = new LinkedList<>();
for (Column column : dataRecord.getColumns()) {
beforeMap.put(column.getName(), Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getOldValue())));
afterMap.put(column.getName(), Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getValue())));
+ if (column.isUniqueKey()) {
+ uniqueKeyNames.add(column.getName());
+ }
}
- TableMetaData metaData = TableMetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTableName(dataRecord.getTableName()).build();
+ TableMetaData metaData = TableMetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTableName(dataRecord.getTableName())
+ .addAllUniqueKeyNames(uniqueKeyNames).build();
DataChangeType dataChangeType = DataChangeType.UNKNOWN;
if (IngestDataChangeType.INSERT.equals(dataRecord.getType())) {
dataChangeType = DataChangeType.INSERT;
diff --git a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index 49c82f09433..dd9a8158918 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -83,6 +83,7 @@ message DataRecordResult {
string database = 1;
optional string schema = 2;
string table_name = 3;
+ repeated string unique_key_names = 4;
}
TableMetaData table_meta_data = 3;
int64 transaction_commit_millis = 4;
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 86b4fce8465..59b18b69389 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
*/
@RequiredArgsConstructor
@Slf4j
-public final class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
+public class InventoryIncrementalTasksRunner implements PipelineTasksRunner {
@Getter
private final PipelineJobItemContext jobItemContext;
@@ -102,7 +102,7 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
jobAPI.updateJobItemStatus(jobItemContext.getJobId(), jobItemContext.getShardingItem(), jobStatus);
}
- private synchronized void executeIncrementalTask() {
+ protected synchronized void executeIncrementalTask() {
if (incrementalTasks.isEmpty()) {
log.info("incrementalTasks empty, ignore");
return;
@@ -122,16 +122,20 @@ public final class InventoryIncrementalTasksRunner implements PipelineTasksRunne
ExecuteEngine.trigger(futures, new IncrementalExecuteCallback());
}
+ protected void inventorySuccessCallback() {
+ if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
+ log.info("onSuccess, all inventory tasks finished.");
+ executeIncrementalTask();
+ } else {
+ log.info("onSuccess, inventory tasks not finished");
+ }
+ }
+
private final class InventoryTaskExecuteCallback implements ExecuteCallback {
@Override
public void onSuccess() {
- if (PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
- log.info("onSuccess, all inventory tasks finished.");
- executeIncrementalTask();
- } else {
- log.info("onSuccess, inventory tasks not finished");
- }
+ inventorySuccessCallback();
}
@Override