You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/03/16 09:14:05 UTC
[shardingsphere] branch master updated: Enable CDC E2E IT (#24650)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new da27912ed3f Enable CDC E2E IT (#24650)
da27912ed3f is described below
commit da27912ed3fa851466cbe7638ed6886fa737ec48
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Thu Mar 16 17:13:53 2023 +0800
Enable CDC E2E IT (#24650)
* Enable CDC E2E IT
* Add struct convert and unit test
* Extract batchInsertRecords method
---
.../cdc/client/util/ProtobufAnyValueConverter.java | 5 +
.../client/util/ProtobufAnyValueConverterTest.java | 75 ++++++++
.../cdc/util/DataRecordResultConvertUtilTest.java | 1 +
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 13 +-
.../cases/cdc/DataSourceRecordConsumer.java | 199 +++++++++++++++++++++
.../pipeline/cases/task/E2EIncrementalTask.java | 38 +---
.../e2e/data/pipeline/util/SQLBuilderUtil.java | 77 ++++++++
7 files changed, 372 insertions(+), 36 deletions(-)
diff --git a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java
index c795a88f328..cd5048d4427 100644
--- a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java
+++ b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverter.java
@@ -27,8 +27,10 @@ import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
+import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
import java.sql.Timestamp;
@@ -83,6 +85,9 @@ public final class ProtobufAnyValueConverter {
if (any.is(com.google.protobuf.Timestamp.class)) {
return converProtobufTimestamp(any.unpack(com.google.protobuf.Timestamp.class));
}
+ if (any.is(Struct.class)) {
+ return JsonFormat.printer().print(any.unpack(Struct.class));
+ }
// TODO can't use JsonFormat, might change the original value without error prompt. there need to cover more types,
log.error("not support unpack value={}", any);
throw new UnsupportedOperationException(String.format("not support unpack the type %s", any.getTypeUrl()));
diff --git a/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
new file mode 100644
index 00000000000..ac11139c4aa
--- /dev/null
+++ b/kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ProtobufAnyValueConverterTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ListValue;
+import com.google.protobuf.NullValue;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.Value;
+import com.google.protobuf.util.JsonFormat;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Timestamp;
+import java.time.OffsetDateTime;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public final class ProtobufAnyValueConverterTest {
+
+ @Test
+ public void assertConvertToObject() throws InvalidProtocolBufferException {
+ Object actual = ProtobufAnyValueConverter.convertToObject(Any.pack(Int32Value.of(123)));
+ assertThat(actual, is(123));
+ actual = ProtobufAnyValueConverter.convertToObject(Any.pack(Int64Value.of(Long.MAX_VALUE)));
+ assertThat(actual, is(Long.MAX_VALUE));
+ OffsetDateTime now = OffsetDateTime.now();
+ actual = ProtobufAnyValueConverter.convertToObject(Any.pack(com.google.protobuf.Timestamp.newBuilder().setSeconds(now.toEpochSecond()).setNanos(now.getNano()).build()));
+ assertThat(actual, is(Timestamp.valueOf(now.toLocalDateTime())));
+ actual = ProtobufAnyValueConverter.convertToObject(Any.pack(FloatValue.of(1.23F)));
+ assertThat(actual, is(1.23F));
+ actual = ProtobufAnyValueConverter.convertToObject(Any.pack(DoubleValue.of(4.56D)));
+ assertThat(actual, is(4.56D));
+ actual = ProtobufAnyValueConverter.convertToObject(Any.pack(StringValue.of("Hello")));
+ assertThat(actual, is("Hello"));
+ actual = ProtobufAnyValueConverter.convertToObject(Any.pack(BoolValue.of(true)));
+ assertTrue((Boolean) actual);
+ actual = ProtobufAnyValueConverter.convertToObject(Any.pack(BytesValue.of(ByteString.copyFrom(new byte[]{1, 2, 3}))));
+ assertThat(actual, is(new byte[]{1, 2, 3}));
+ actual = Struct.newBuilder().putFields("str", Value.newBuilder().setStringValue("ABC defg").build())
+ .putFields("null", Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
+ .putFields("number", Value.newBuilder().setNumberValue(123.45D).build())
+ .putFields("list", Value.newBuilder().setListValue(ListValue.newBuilder().addValues(Value.newBuilder().setNumberValue(1)).build()).build()).build();
+ Builder expected = Struct.newBuilder();
+ JsonFormat.parser().merge((String) ProtobufAnyValueConverter.convertToObject(Any.pack((Struct) actual)), expected);
+ assertEquals(actual.toString(), expected.toString());
+ }
+}
diff --git a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
index b351c9e485f..39d418f226e 100644
--- a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
+++ b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
@@ -64,6 +64,7 @@ public final class DataRecordResultConvertUtilTest {
dataRecord.addColumn(new Column("create_time2", OffsetTime.now(), false, false));
dataRecord.addColumn(new Column("create_datetime", LocalDateTime.now(), false, false));
dataRecord.addColumn(new Column("create_datetime2", OffsetDateTime.now(), false, false));
+ dataRecord.addColumn(new Column("empty", null, false, false));
Blob mockedBlob = mock(Blob.class);
when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new byte[]{-1, 0, 1});
dataRecord.addColumn(new Column("data_blob", mockedBlob, false, false));
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 8c34fef523a..a70dcb628f5 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -35,12 +35,15 @@ import org.apache.shardingsphere.data.pipeline.core.check.consistency.SingleTabl
import org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.DataMatchDataConsistencyCalculateAlgorithm;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.PipelineE2EEnvironment;
import org.apache.shardingsphere.test.e2e.data.pipeline.env.enums.PipelineEnvTypeEnum;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
+import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ECondition;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
@@ -164,8 +167,10 @@ public final class CDCE2EIT {
}
private void startCDCClient(final PipelineContainerComposer containerComposer) {
- // TODO fix later
- StartCDCClientParameter parameter = new StartCDCClientParameter(records -> log.info("records: {}", records));
+ DataSource dataSource = StorageContainerUtil.generateDataSource(containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false)),
+ containerComposer.getUsername(), containerComposer.getPassword());
+ DataSourceRecordConsumer dataSourceRecordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
+ StartCDCClientParameter parameter = new StartCDCClientParameter(dataSourceRecordConsumer);
parameter.setAddress("localhost");
parameter.setPort(containerComposer.getContainerComposer().getProxyCDCPort());
parameter.setUsername(ProxyContainerConstants.USERNAME);
@@ -196,8 +201,6 @@ public final class CDCE2EIT {
}
private static boolean isEnabled() {
- // TODO xinze fix it
- // return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new OpenGaussDatabaseType());
- return false;
+ return PipelineE2ECondition.isEnabled(new MySQLDatabaseType(), new OpenGaussDatabaseType());
}
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
new file mode 100644
index 00000000000..6fcfef5607a
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.cases.cdc;
+
+import com.google.common.base.Strings;
+import com.google.protobuf.InvalidProtocolBufferException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.ProtobufAnyValueConverter;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.MetaData;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.TableColumn;
+import org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.SQLBuilderUtil;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+@Slf4j
+public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
+
+ private final DataSource dataSource;
+
+ private final Map<String, PipelineTableMetaData> tableMetaDataMap;
+
+ private final StandardPipelineTableMetaDataLoader loader;
+
+ public DataSourceRecordConsumer(final DataSource dataSource, final DatabaseType databaseType) {
+ this.dataSource = dataSource;
+ tableMetaDataMap = new ConcurrentHashMap<>();
+ loader = new StandardPipelineTableMetaDataLoader(new PipelineDataSourceWrapper(dataSource, databaseType));
+ }
+
+ @Override
+ public void accept(final List<Record> records) {
+ long insertCount = records.stream().filter(each -> DataChangeType.INSERT == each.getDataChangeType()).count();
+ if (insertCount == records.size()) {
+ batchInsertRecords(records);
+ return;
+ }
+ for (Record record : records) {
+ write(record);
+ }
+ }
+
+ private void batchInsertRecords(final List<Record> records) {
+ Record firstRecord = records.get(0);
+ MetaData metaData = firstRecord.getMetaData();
+ PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
+ List<String> columnNames = firstRecord.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
+ String tableName = buildTableNameWithSchema(metaData.getSchema(), metaData.getTable());
+ String insertSQL = SQLBuilderUtil.buildInsertSQL(columnNames, tableName);
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(insertSQL)) {
+ for (Record each : records) {
+ List<TableColumn> tableColumns = each.getAfterList();
+ for (int i = 0; i < tableColumns.size(); i++) {
+ preparedStatement.setObject(i + 1, convertValueFromAny(tableMetaData, tableColumns.get(i)));
+ }
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private void write(final Record record) {
+ String sql = buildSQL(record);
+ MetaData metaData = record.getMetaData();
+ PipelineTableMetaData tableMetaData = loadTableMetaData(metaData.getSchema(), metaData.getTable());
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
+ Map<String, TableColumn> afterMap = new LinkedHashMap<>(record.getBeforeList().size(), 1);
+ record.getAfterList().forEach(each -> afterMap.put(each.getName(), each));
+ switch (record.getDataChangeType()) {
+ case INSERT:
+ for (int i = 0; i < record.getAfterCount(); i++) {
+ TableColumn tableColumn = record.getAfterList().get(i);
+ preparedStatement.setObject(i + 1, convertValueFromAny(tableMetaData, tableColumn));
+ }
+ break;
+ case UPDATE:
+ for (int i = 0; i < record.getAfterCount(); i++) {
+ TableColumn tableColumn = record.getAfterList().get(i);
+ preparedStatement.setObject(i + 1, convertValueFromAny(tableMetaData, tableColumn));
+ }
+ preparedStatement.setObject(record.getAfterCount() + 1, convertValueFromAny(tableMetaData, afterMap.get("order_id")));
+ int updateCount = preparedStatement.executeUpdate();
+ if (1 != updateCount) {
+ log.warn("executeUpdate failed, updateCount={}, updateSql={}, updatedColumns={}", updateCount, sql, afterMap.keySet());
+ }
+ break;
+ case DELETE:
+ Object orderId = convertValueFromAny(tableMetaData, afterMap.get("order_id"));
+ preparedStatement.setObject(1, orderId);
+ preparedStatement.execute();
+ break;
+ default:
+ }
+ preparedStatement.execute();
+ } catch (final SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private PipelineTableMetaData loadTableMetaData(final String schemaName, final String tableName) {
+ PipelineTableMetaData result = tableMetaDataMap.get(buildTableNameWithSchema(schemaName, tableName));
+ if (null != result) {
+ return result;
+ }
+ result = loader.getTableMetaData(Strings.emptyToNull(schemaName), tableName);
+ tableMetaDataMap.put(buildTableNameWithSchema(schemaName, tableName), result);
+ return result;
+ }
+
+ private String buildTableNameWithSchema(final String schema, final String tableName) {
+ return schema.isEmpty() ? tableName : String.join(".", schema, tableName);
+ }
+
+ private String buildSQL(final Record record) {
+ List<String> columnNames = record.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
+ MetaData metaData = record.getMetaData();
+ String tableName = buildTableNameWithSchema(metaData.getSchema(), metaData.getTable());
+ switch (record.getDataChangeType()) {
+ case INSERT:
+ return SQLBuilderUtil.buildInsertSQL(columnNames, tableName);
+ case UPDATE:
+ return SQLBuilderUtil.buildUpdateSQL(columnNames, tableName, "?");
+ case DELETE:
+ return SQLBuilderUtil.buildDeleteSQL(tableName, "order_id");
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private Object convertValueFromAny(final PipelineTableMetaData tableMetaData, final TableColumn tableColumn) {
+ PipelineColumnMetaData columnMetaData = tableMetaData.getColumnMetaData(tableColumn.getName());
+ Object result;
+ try {
+ result = ProtobufAnyValueConverter.convertToObject(tableColumn.getValue());
+ } catch (final InvalidProtocolBufferException ex) {
+ log.error("invalid protocol message value: {}", tableColumn.getValue());
+ throw new RuntimeException(ex);
+ }
+ if (null == result) {
+ return null;
+ }
+ switch (columnMetaData.getDataType()) {
+ case Types.TIME:
+ if ("TIME".equalsIgnoreCase(columnMetaData.getDataTypeName())) {
+ // Time.valueOf() will lose nanos
+ return LocalTime.ofNanoOfDay((Long) result);
+ }
+ return result;
+ case Types.DATE:
+ if ("DATE".equalsIgnoreCase(columnMetaData.getDataTypeName())) {
+ LocalDate localDate = LocalDate.ofEpochDay((Long) result);
+ return Date.valueOf(localDate);
+ }
+ return result;
+ default:
+ return result;
+ }
+ }
+}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index eadb185cdf7..e8a67c75c5f 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -27,6 +27,7 @@ import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
import org.apache.shardingsphere.test.e2e.data.pipeline.util.DataSourceExecuteUtil;
+import org.apache.shardingsphere.test.e2e.data.pipeline.util.SQLBuilderUtil;
import javax.sql.DataSource;
import java.math.BigDecimal;
@@ -87,35 +88,20 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
private void insertOrder(final Object[] orderInsertData) {
String sql;
if (databaseType instanceof MySQLDatabaseType) {
- sql = buildInsertSQL(MYSQL_COLUMN_NAMES);
+ sql = SQLBuilderUtil.buildInsertSQL(MYSQL_COLUMN_NAMES, orderTableName);
} else if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
- sql = buildInsertSQL(POSTGRESQL_COLUMN_NAMES);
+ sql = SQLBuilderUtil.buildInsertSQL(POSTGRESQL_COLUMN_NAMES, orderTableName);
} else {
throw new UnsupportedOperationException();
}
DataSourceExecuteUtil.execute(dataSource, sql, orderInsertData);
}
- private String buildInsertSQL(final List<String> columnNames) {
- StringBuilder sql = new StringBuilder("INSERT INTO %s (");
- for (String each : columnNames) {
- sql.append(each).append(",");
- }
- sql.setLength(sql.length() - 1);
- sql.append(") ").append("VALUES").append("(");
- for (int i = 0; i < columnNames.size(); i++) {
- sql.append("?,");
- }
- sql.setLength(sql.length() - 1);
- sql.append(")");
- return String.format(sql.toString(), orderTableName);
- }
-
private void updateOrderById(final Object orderId) {
ThreadLocalRandom random = ThreadLocalRandom.current();
int randomInt = random.nextInt(-100, 100);
if (databaseType instanceof MySQLDatabaseType) {
- String sql = String.format(buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), "?"), orderTableName);
+ String sql = SQLBuilderUtil.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), orderTableName, "?");
log.info("update sql: {}", sql);
int randomUnsignedInt = random.nextInt(10, 100);
LocalDateTime now = LocalDateTime.now();
@@ -125,7 +111,7 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
return;
}
if (databaseType instanceof PostgreSQLDatabaseType || databaseType instanceof OpenGaussDatabaseType) {
- String sql = String.format(buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), "?"), orderTableName);
+ String sql = SQLBuilderUtil.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), orderTableName, "?");
log.info("update sql: {}", sql);
DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
PipelineCaseHelper.generateDouble(), PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
@@ -133,28 +119,18 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
}
}
- private String buildUpdateSQL(final List<String> columnNames, final String placeholder) {
- StringBuilder sql = new StringBuilder("UPDATE %s SET ");
- for (String each : columnNames) {
- sql.append(each).append("=").append(placeholder).append(",");
- }
- sql.setLength(sql.length() - 1);
- sql.append(" WHERE order_id=?");
- return sql.toString();
- }
-
private List<String> ignoreShardingColumns(final List<String> columnNames) {
return new ArrayList<>(columnNames.subList(2, columnNames.size()));
}
private void deleteOrderById(final Object orderId) {
- String sql = String.format("DELETE FROM %s WHERE order_id = ?", orderTableName);
+ String sql = SQLBuilderUtil.buildDeleteSQL(orderTableName, "order_id");
DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{orderId});
}
private void setNullToAllFields(final Object orderId) {
if (databaseType instanceof MySQLDatabaseType) {
- String sql = String.format(buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), "null"), orderTableName);
+ String sql = SQLBuilderUtil.buildUpdateSQL(ignoreShardingColumns(MYSQL_COLUMN_NAMES), orderTableName, "null");
DataSourceExecuteUtil.execute(dataSource, sql, new Object[]{orderId});
}
}
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/SQLBuilderUtil.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/SQLBuilderUtil.java
new file mode 100644
index 00000000000..11cda8f7661
--- /dev/null
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/util/SQLBuilderUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.data.pipeline.util;
+
+import java.util.List;
+
+/**
+ * SQL builder util.
+ */
+public final class SQLBuilderUtil {
+
+ /**
+ * Build insert SQL.
+ *
+ * @param columnNames column names
+ * @param tableName table name
+ * @return insert SQL
+ */
+ public static String buildInsertSQL(final List<String> columnNames, final String tableName) {
+ StringBuilder result = new StringBuilder("INSERT INTO %s (");
+ for (String each : columnNames) {
+ result.append(each).append(",");
+ }
+ result.setLength(result.length() - 1);
+ result.append(") ").append("VALUES").append("(");
+ for (int i = 0; i < columnNames.size(); i++) {
+ result.append("?,");
+ }
+ result.setLength(result.length() - 1);
+ result.append(")");
+ return String.format(result.toString(), tableName);
+ }
+
+ /**
+ * Build update SQL.
+ *
+ * @param columnNames column names
+ * @param tableName table name
+ * @param placeholder placeholder
+ * @return update SQL
+ */
+ public static String buildUpdateSQL(final List<String> columnNames, final String tableName, final String placeholder) {
+ StringBuilder result = new StringBuilder("UPDATE %s SET ");
+ for (String each : columnNames) {
+ result.append(each).append("=").append(placeholder).append(",");
+ }
+ result.setLength(result.length() - 1);
+ result.append(" WHERE order_id=?");
+ return String.format(result.toString(), tableName);
+ }
+
+ /**
+ * Build delete SQL.
+ *
+ * @param tableName table name
+ * @param primaryKeyName primary key name
+ * @return delete SQL
+ */
+ public static String buildDeleteSQL(final String tableName, final String primaryKeyName) {
+ return String.format("DELETE FROM %s WHERE %s = ?", tableName, primaryKeyName);
+ }
+}