You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/05/15 02:24:05 UTC
[shardingsphere] branch master updated: Add test fields for openGauss at pipeline E2E (#25608)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d67ba69b5b4 Add test fields for openGauss at pipeline E2E (#25608)
d67ba69b5b4 is described below
commit d67ba69b5b4e38af3fbd7c0c849c875ee3d97f07
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Mon May 15 10:23:57 2023 +0800
Add test fields for openGauss at pipeline E2E (#25608)
* Add test fields for openGauss/PostgreSQL at pipeline E2E
* Add jdbcUrl parameter for fix bit(n) column migration
* Data match add array equals method
* Improve column names
---
...DataMatchDataConsistencyCalculateAlgorithm.java | 3 +
.../OpenGaussJdbcQueryPropertiesExtension.java | 2 +
.../ingest/wal/OpenGaussColumnValueReader.java} | 48 ++++++-------
...ta.pipeline.spi.ingest.dumper.ColumnValueReader | 18 +++++
.../OpenGaussJdbcQueryPropertiesExtensionTest.java | 2 +-
.../ingest/PostgreSQLColumnValueReader.java | 9 ---
.../pipeline/cases/PipelineContainerComposer.java | 54 +++++++-------
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 6 +-
.../cases/migration/AbstractMigrationE2EIT.java | 8 +--
.../general/PostgreSQLMigrationGeneralE2EIT.java | 8 +--
.../pipeline/cases/task/E2EIncrementalTask.java | 29 ++++++--
.../framework/helper/PipelineCaseHelper.java | 22 +++++-
.../resources/env/scenario/general/opengauss.xml | 82 ++++++++++++++++++++++
13 files changed, 210 insertions(+), 81 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index c6ea5031b44..01b30ae060f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -42,6 +42,7 @@ import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.math.BigDecimal;
+import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -296,6 +297,8 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
matched = ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
} else if (thisResult instanceof BigDecimal && thatResult instanceof BigDecimal) {
matched = DataConsistencyCheckUtils.isBigDecimalEquals((BigDecimal) thisResult, (BigDecimal) thatResult);
+ } else if (thisResult instanceof Array && thatResult instanceof Array) {
+ matched = Objects.deepEquals(((Array) thisResult).getArray(), ((Array) thatResult).getArray());
} else {
matched = equalsBuilder.append(thisResult, thatResult).isEquals();
}
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
index c2952349a32..616631728d9 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtension.java
@@ -30,6 +30,8 @@ public final class OpenGaussJdbcQueryPropertiesExtension implements JdbcQueryPro
@Override
public Properties extendQueryProperties() {
+ queryProps.setProperty("stringtype", "unspecified");
+ queryProps.setProperty("bitToString", "true");
return queryProps;
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
similarity index 54%
copy from kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
copy to kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
index a84844018bb..b299be37ac0 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/OpenGaussColumnValueReader.java
@@ -15,62 +15,58 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
+package org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractColumnValueReader;
-import org.postgresql.util.PGobject;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
-import java.util.Collection;
-import java.util.Collections;
/**
- * Column value reader for PostgreSQL.
+ * Column value reader for openGauss.
*/
-public final class PostgreSQLColumnValueReader extends AbstractColumnValueReader {
+public final class OpenGaussColumnValueReader extends AbstractColumnValueReader {
- private static final Collection<String> TYPE_ALIASES = Collections.singletonList("openGauss");
+ private static final String MONEY_TYPE = "money";
- private static final String PG_MONEY_TYPE = "money";
+ private static final String BIT_TYPE = "bit";
- private static final String PG_BIT_TYPE = "bit";
+ private static final String BOOL_TYPE = "bool";
@Override
protected Object doReadValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
- if (isPgMoneyType(metaData, columnIndex)) {
+ if (isMoneyType(metaData, columnIndex)) {
return resultSet.getBigDecimal(columnIndex);
}
- if (isPgBitType(metaData, columnIndex)) {
- PGobject result = new PGobject();
- result.setType("bit");
- Object bitValue = resultSet.getObject(columnIndex);
- result.setValue(null == bitValue ? null : (Boolean) bitValue ? "1" : "0");
- return result;
+ if (isBitType(metaData, columnIndex)) {
+ // openGauss JDBC driver can't parse bit(n) correctly when n > 1, so JDBC url already add bitToString, there just return string
+ return resultSet.getString(columnIndex);
+ }
+ if (isBoolType(metaData, columnIndex)) {
+ return resultSet.getBoolean(columnIndex);
}
return super.defaultDoReadValue(resultSet, metaData, columnIndex);
}
- private boolean isPgMoneyType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
- return PG_MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+ private boolean isMoneyType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
+ return MONEY_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+ }
+
+ private boolean isBoolType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
+ return BOOL_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
}
- private boolean isPgBitType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
+ private boolean isBitType(final ResultSetMetaData resultSetMetaData, final int index) throws SQLException {
if (Types.BIT == resultSetMetaData.getColumnType(index)) {
- return PG_BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
+ return BIT_TYPE.equalsIgnoreCase(resultSetMetaData.getColumnTypeName(index));
}
return false;
}
@Override
public String getType() {
- return "PostgreSQL";
- }
-
- @Override
- public Collection<String> getTypeAliases() {
- return TYPE_ALIASES;
+ return "openGauss";
}
}
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
new file mode 100644
index 00000000000..c2616b6fa02
--- /dev/null
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussColumnValueReader
diff --git a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
index 97a7227bd65..9968d05953d 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/test/java/org/apache/shardingsphere/data/pipeline/opengauss/datasource/OpenGaussJdbcQueryPropertiesExtensionTest.java
@@ -40,6 +40,6 @@ class OpenGaussJdbcQueryPropertiesExtensionTest {
private void assertExtension(final JdbcQueryPropertiesExtension actual) {
assertThat(actual, instanceOf(OpenGaussJdbcQueryPropertiesExtension.class));
assertThat(actual.getType(), equalTo("openGauss"));
- assertTrue(actual.extendQueryProperties().isEmpty());
+ assertThat(actual.extendQueryProperties().size(), equalTo(2));
}
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
index a84844018bb..0cfdbdb6cdb 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
@@ -24,16 +24,12 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
-import java.util.Collection;
-import java.util.Collections;
/**
* Column value reader for PostgreSQL.
*/
public final class PostgreSQLColumnValueReader extends AbstractColumnValueReader {
- private static final Collection<String> TYPE_ALIASES = Collections.singletonList("openGauss");
-
private static final String PG_MONEY_TYPE = "money";
private static final String PG_BIT_TYPE = "bit";
@@ -68,9 +64,4 @@ public final class PostgreSQLColumnValueReader extends AbstractColumnValueReader
public String getType() {
return "PostgreSQL";
}
-
- @Override
- public Collection<String> getTypeAliases() {
- return TYPE_ALIASES;
- }
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 19b903cf565..99a8fb2fe11 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -123,7 +123,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
}
extraSQLCommand = JAXB.unmarshal(Objects.requireNonNull(PipelineContainerComposer.class.getClassLoader().getResource(testParam.getScenario())), ExtraSQLCommand.class);
containerComposer.start();
- sourceDataSource = StorageContainerUtils.generateDataSource(appendExtraParameter(getActualJdbcUrlTemplate(DS_0, false)), username, password);
+ sourceDataSource = StorageContainerUtils.generateDataSource(getActualJdbcUrlTemplate(DS_0, false), username, password);
proxyDataSource = StorageContainerUtils.generateDataSource(
appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)), ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD);
init(jobType);
@@ -198,7 +198,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Append extra parameter.
- *
+ *
* @param jdbcUrl JDBC URL
* @return appended JDBC URL
*/
@@ -207,14 +207,14 @@ public final class PipelineContainerComposer implements AutoCloseable {
return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("rewriteBatchedStatements", Boolean.TRUE.toString())));
}
if (DatabaseTypeUtils.isPostgreSQL(databaseType) || DatabaseTypeUtils.isOpenGauss(databaseType)) {
- return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified")));
+ return new JdbcUrlAppender().appendQueryProperties(jdbcUrl, PropertiesBuilder.build(new Property("stringtype", "unspecified"), new Property("bitToString", Boolean.TRUE.toString())));
}
return jdbcUrl;
}
/**
* Register storage unit.
- *
+ *
* @param storageUnitName storage unit name
* @throws SQLException SQL exception
*/
@@ -222,13 +222,13 @@ public final class PipelineContainerComposer implements AutoCloseable {
String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", storageUnitName)
.replace("${user}", getUsername())
.replace("${password}", getPassword())
- .replace("${url}", appendExtraParameter(getActualJdbcUrlTemplate(storageUnitName, true)));
+ .replace("${url}", getActualJdbcUrlTemplate(storageUnitName, true));
proxyExecuteWithLog(registerStorageUnitTemplate, 2);
}
/**
* Add resource.
- *
+ *
* @param distSQL dist SQL
* @throws SQLException SQL exception
*/
@@ -239,7 +239,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Get actual JDBC URL template.
- *
+ *
* @param databaseName database name
* @param isInContainer is in container
* @param storageContainerIndex storage container index
@@ -257,18 +257,18 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Get actual JDBC URL template.
- *
+ *
* @param databaseName database name
* @param isInContainer is in container
* @return actual JDBC URL template
*/
public String getActualJdbcUrlTemplate(final String databaseName, final boolean isInContainer) {
- return getActualJdbcUrlTemplate(databaseName, isInContainer, 0);
+ return appendExtraParameter(getActualJdbcUrlTemplate(databaseName, isInContainer, 0));
}
/**
* Create schema.
- *
+ *
* @param connection connection
* @param sleepSeconds sleep seconds
* @throws SQLException SQL exception
@@ -286,8 +286,8 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Create source order table.
- *
- * @param sourceTableName source table name
+ *
+ * @param sourceTableName source table name
* @throws SQLException SQL exception
*/
public void createSourceOrderTable(final String sourceTableName) throws SQLException {
@@ -296,7 +296,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Create source table index list.
- *
+ *
* @param schema schema
* @param sourceTableName source table name
* @throws SQLException SQL exception
@@ -311,7 +311,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Create source comment on list.
- *
+ *
* @param schema schema
* @param sourceTableName source table name
* @throws SQLException SQL exception
@@ -322,7 +322,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Create source order item table.
- *
+ *
* @throws SQLException SQL exception
*/
public void createSourceOrderItemTable() throws SQLException {
@@ -331,7 +331,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Source execute with log.
- *
+ *
* @param sql SQL
* @throws SQLException SQL exception
*/
@@ -344,7 +344,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Proxy execute with log.
- *
+ *
* @param sql SQL
* @param sleepSeconds sleep seconds
* @throws SQLException SQL exception
@@ -360,7 +360,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Wait job prepare success.
- *
+ *
* @param distSQL dist SQL
*/
@SneakyThrows(InterruptedException.class)
@@ -376,7 +376,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Query for list with log.
- *
+ *
* @param sql SQL
* @return query result
* @throws RuntimeException runtime exception
@@ -399,7 +399,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Transform result set to list.
- *
+ *
* @param resultSet result set
* @return transformed result
* @throws SQLException SQL exception
@@ -420,7 +420,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Start increment task.
- *
+ *
* @param baseIncrementTask base increment task
*/
public void startIncrementTask(final BaseIncrementTask baseIncrementTask) {
@@ -430,7 +430,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Wait increment task finished.
- *
+ *
* @param distSQL dist SQL
* @return result
* @throws InterruptedException interrupted exception
@@ -464,7 +464,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Assert proxy order record exist.
- *
+ *
* @param tableName table name
* @param orderId order id
*/
@@ -480,7 +480,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Assert proxy order record exist.
- *
+ *
* @param sql SQL
*/
@SneakyThrows(InterruptedException.class)
@@ -499,7 +499,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Get target table records count.
- *
+ *
* @param tableName table name
* @return target table records count
*/
@@ -511,7 +511,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Assert greater than order table init rows.
- *
+ *
* @param tableInitRows table init rows
* @param schema schema
*/
@@ -523,7 +523,7 @@ public final class PipelineContainerComposer implements AutoCloseable {
/**
* Generate ShardingSphere data source from proxy.
- *
+ *
* @return ShardingSphere data source
* @throws SQLException SQL exception
*/
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 133bd0e8495..1967d7d97be 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -79,7 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
@PipelineE2ESettings(database = {
@PipelineE2EDatabaseSettings(type = "MySQL", scenarioFiles = "env/scenario/general/mysql.xml"),
- @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/postgresql.xml")})
+ @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/opengauss.xml")})
@Slf4j
class CDCE2EIT {
@@ -108,7 +108,7 @@ class CDCE2EIT {
}
createOrderTableRule(containerComposer);
try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
- initSchemaAndTable(containerComposer, connection, 2);
+ initSchemaAndTable(containerComposer, connection, 3);
}
DataSource jdbcDataSource = containerComposer.generateShardingSphereDataSourceFromProxy();
Pair<List<Object[]>, List<Object[]>> dataPair = PipelineCaseHelper.generateFullInsertData(containerComposer.getDatabaseType(), PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
@@ -170,7 +170,7 @@ class CDCE2EIT {
}
private void startCDCClient(final PipelineContainerComposer containerComposer) {
- DataSource dataSource = StorageContainerUtils.generateDataSource(containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false)),
+ DataSource dataSource = StorageContainerUtils.generateDataSource(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
containerComposer.getUsername(), containerComposer.getPassword());
StartCDCClientParameter parameter = new StartCDCClientParameter();
parameter.setAddress("localhost");
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index a3fa80535d2..60d7960e9ed 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -61,16 +61,16 @@ public abstract class AbstractMigrationE2EIT {
}
String addSourceResource = migrationDistSQL.getRegisterMigrationSourceStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
.replace("${password}", containerComposer.getPassword())
- .replace("${ds0}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true)));
+ .replace("${ds0}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_0, true));
containerComposer.addResource(addSourceResource);
}
protected void addMigrationTargetResource(final PipelineContainerComposer containerComposer) throws SQLException {
String addTargetResource = migrationDistSQL.getRegisterMigrationTargetStorageUnitTemplate().replace("${user}", containerComposer.getUsername())
.replace("${password}", containerComposer.getPassword())
- .replace("${ds2}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true)))
- .replace("${ds3}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true)))
- .replace("${ds4}", containerComposer.appendExtraParameter(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true)));
+ .replace("${ds2}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_2, true))
+ .replace("${ds3}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, true))
+ .replace("${ds4}", containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, true));
containerComposer.addResource(addTargetResource);
List<Map<String, Object>> resources = containerComposer.queryForListWithLog("SHOW STORAGE UNITS from sharding_db");
assertThat(resources.size(), is(3));
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index f518c43e58d..51fff681a5c 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -47,11 +47,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@PipelineE2ESettings(database = {
@PipelineE2EDatabaseSettings(type = "PostgreSQL", scenarioFiles = "env/scenario/general/postgresql.xml"),
- @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/postgresql.xml")})
+ @PipelineE2EDatabaseSettings(type = "openGauss", scenarioFiles = "env/scenario/general/opengauss.xml")})
@Slf4j
class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
- private static final String SOURCE_TABLE_NAME = "t_order_copy";
+ private static final String SOURCE_TABLE_NAME = "t_order";
private static final String TARGET_TABLE_NAME = "t_order";
@@ -97,12 +97,12 @@ class PostgreSQLMigrationGeneralE2EIT extends AbstractMigrationE2EIT {
private void checkOrderMigration(final PipelineContainerComposer containerComposer, final String jobId) throws SQLException, InterruptedException {
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(containerComposer, jobId);
+ // must refresh firstly, otherwise proxy can't get schema and table info
+ containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
long recordId = new SnowflakeKeyGenerateAlgorithm().generateKey();
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO %s (order_id,user_id,status) VALUES (%s, %s, '%s')",
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME), recordId, 1, "afterStop"));
startMigrationByJobId(containerComposer, jobId);
- // must refresh firstly, otherwise proxy can't get schema and table info
- containerComposer.proxyExecuteWithLog("REFRESH TABLE METADATA;", 2);
Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog(
String.format("SELECT * FROM %s WHERE order_id = %s", String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId)).isEmpty());
containerComposer.assertProxyOrderRecordExist(String.join(".", PipelineContainerComposer.SCHEMA_NAME, TARGET_TABLE_NAME), recordId);
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
index 0e211b4e7e6..72049d85340 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/task/E2EIncrementalTask.java
@@ -19,9 +19,11 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.task;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.apache.shardingsphere.test.e2e.data.pipeline.cases.base.BaseIncrementTask;
import org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
@@ -40,6 +42,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@RequiredArgsConstructor
@@ -53,6 +56,10 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
private static final List<String> POSTGRESQL_COLUMN_NAMES = Arrays.asList("order_id", "user_id", "status", "t_int2", "t_numeric", "t_bool", "t_bytea", "t_char", "t_varchar", "t_float",
"t_double", "t_json", "t_jsonb", "t_text", "t_date", "t_time", "t_timestamp", "t_timestamptz");
+ private static final List<String> OPENGAUSS_COLUMN_NAMES = Arrays.asList("order_id", "user_id", "status", "c_int", "c_smallint", "c_float", "c_double", "c_numeric", "c_boolean", "c_char",
+ "c_text", "c_bytea", "c_date", "c_time", "c_smalldatetime", "c_timestamp", "c_timestamptz", "c_interval", "c_array", "c_json", "c_jsonb", "c_uuid", "c_hash32", "c_tsvector", "c_bit",
+ "c_int4range", "c_reltime", "c_abstime", "c_point", "c_lseg", "c_box", "c_circle", "c_bitvarying", "c_cidr", "c_inet", "c_macaddr", "c_hll");
+
private final DataSource dataSource;
private final String orderTableName;
@@ -88,8 +95,10 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
String sql;
if (databaseType instanceof MySQLDatabaseType) {
sql = SQLBuilderUtils.buildInsertSQL(MYSQL_COLUMN_NAMES, orderTableName);
- } else if (databaseType instanceof SchemaSupportedDatabaseType) {
+ } else if (databaseType instanceof PostgreSQLDatabaseType) {
sql = SQLBuilderUtils.buildInsertSQL(POSTGRESQL_COLUMN_NAMES, orderTableName);
+ } else if (databaseType instanceof OpenGaussDatabaseType) {
+ sql = SQLBuilderUtils.buildInsertSQL(OPENGAUSS_COLUMN_NAMES, orderTableName);
} else {
throw new UnsupportedOperationException();
}
@@ -110,13 +119,25 @@ public final class E2EIncrementalTask extends BaseIncrementTask {
DataSourceExecuteUtils.execute(dataSource, sql, parameters);
return;
}
- if (databaseType instanceof SchemaSupportedDatabaseType) {
+ if (databaseType instanceof PostgreSQLDatabaseType) {
String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(POSTGRESQL_COLUMN_NAMES), orderTableName, "?");
- Object[] parameters = {"中文测试", randomInt, BigDecimal.valueOf(10000), true, new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
+ Object[] parameters = {"中文测试", randomInt, BigDecimal.valueOf(10000), random.nextBoolean(), new byte[]{}, "char", "varchar", PipelineCaseHelper.generateFloat(),
PipelineCaseHelper.generateDouble(), PipelineCaseHelper.generateJsonString(10, true), PipelineCaseHelper.generateJsonString(20, true), "text-update", LocalDate.now(),
LocalTime.now(), Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), orderId};
log.info("update sql: {}, params: {}", sql, parameters);
DataSourceExecuteUtils.execute(dataSource, sql, parameters);
+ return;
+ }
+ if (databaseType instanceof OpenGaussDatabaseType) {
+ LocalDateTime now = LocalDateTime.now();
+ String sql = SQLBuilderUtils.buildUpdateSQL(ignoreShardingColumns(OPENGAUSS_COLUMN_NAMES), orderTableName, "?");
+ Object[] parameters = {"中文测试", randomInt, random.nextInt(-999, 999), PipelineCaseHelper.generateFloat(), PipelineCaseHelper.generateDouble(), BigDecimal.valueOf(10000),
+ random.nextBoolean(), "update-char", "update-text", "update-bytea".getBytes(), now.toLocalDate().plusDays(1), now.toLocalTime().plusHours(6), "2023-03-01", now,
+ OffsetDateTime.now(), "1 years 1 mons 1 days 1 hours 1 mins 1 secs", "{4, 5, 6}", PipelineCaseHelper.generateJsonString(1, true), PipelineCaseHelper.generateJsonString(1, false),
+ UUID.randomUUID().toString(), DigestUtils.md5Hex(now.toString()), null, "1111", "[1,10000)", "2 years 2 mons 2 days 06:00:00", "2023-01-01 00:00:00+00", "(2.0,2.0)",
+ "[(0.0,0.0),(3.0,3.0)]", "(1.0,1.0),(3.0,3.0)", "<(5.0,5.0),1.0>", "1010", "192.168.0.0/24", "192.168.1.1", "08:00:3b:01:02:03", null, orderId};
+ log.info("update sql: {}, params: {}", sql, parameters);
+ DataSourceExecuteUtils.execute(dataSource, sql, parameters);
}
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
index c6b9cf1ec82..04a3400cb46 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/framework/helper/PipelineCaseHelper.java
@@ -21,11 +21,13 @@ import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
@@ -41,6 +43,7 @@ import java.time.OffsetDateTime;
import java.time.Year;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@@ -91,7 +94,7 @@ public final class PipelineCaseHelper {
}
return result;
}
- if (databaseType instanceof SchemaSupportedDatabaseType) {
+ if (databaseType instanceof PostgreSQLDatabaseType) {
for (int i = 0; i < insertRows; i++) {
Object orderId = keyGenerateAlgorithm.generateKey();
result.add(new Object[]{orderId, generateInt(0, 100), generateString(6), generateInt(-128, 127),
@@ -101,6 +104,19 @@ public final class PipelineCaseHelper {
}
return result;
}
+ if (databaseType instanceof OpenGaussDatabaseType) {
+ for (int i = 0; i < insertRows; i++) {
+ Object orderId = keyGenerateAlgorithm.generateKey();
+ // TODO openGauss mpp plugin parses single quotes incorrectly
+ result.add(new Object[]{orderId, generateInt(0, 1000), "status" + i, generateInt(-1000, 9999), generateInt(0, 100), generateFloat(), generateDouble(),
+ BigDecimal.valueOf(generateDouble()), false, generateString(6), "texts", "bytea".getBytes(), LocalDate.now(), LocalTime.now(), "2001-10-01",
+ Timestamp.valueOf(LocalDateTime.now()), OffsetDateTime.now(), "0 years 0 mons 1 days 2 hours 3 mins 4 secs", "{1, 2, 3}", generateJsonString(8, false),
+ generateJsonString(8, true), UUID.randomUUID().toString(), DigestUtils.md5Hex(orderId.toString()), null, "0000", "[1,1000)",
+ "1 years 1 mons 10 days -06:00:00", "2000-01-02 00:00:00+00", "(1.0,1.0)", "[(0.0,0.0),(2.0,2.0)]", "(3.0,3.0),(1.0,1.0)", "<(5.0,5.0),5.0>", "1111",
+ "192.168.0.0/16", "192.168.1.1", "08:00:2b:01:02:03", "\\x484c4c00000000002b05000000000000000000000000000000000000"});
+ }
+ return result;
+ }
throw new UnsupportedOperationException("now support generate %s insert data");
}
@@ -109,7 +125,7 @@ public final class PipelineCaseHelper {
}
private static String generateString(final int strLength) {
- return RandomStringUtils.randomAlphabetic(strLength);
+ return RandomStringUtils.randomAlphanumeric(strLength);
}
/**
diff --git a/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
new file mode 100644
index 00000000000..c7da6ef3ab4
--- /dev/null
+++ b/test/e2e/operation/pipeline/src/test/resources/env/scenario/general/opengauss.xml
@@ -0,0 +1,82 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<command>
+ <create-table-order>
+ create table test.t_order (
+ order_id bigint,
+ user_id integer,
+ status character varying(50),
+ c_int integer,
+ c_smallint smallint,
+ c_float real,
+ c_double double precision,
+ c_numeric numeric(10,2),
+ c_boolean boolean,
+ c_char character(32),
+ c_text text,
+ c_bytea bytea,
+ c_date date,
+ c_time time without time zone,
+ c_smalldatetime smalldatetime,
+ c_timestamp timestamp without time zone,
+ c_timestamptz timestamp with time zone,
+ c_interval interval,
+ c_array integer[],
+ c_json json,
+ c_jsonb jsonb,
+ c_uuid uuid,
+ c_hash32 hash32,
+ c_tsvector tsvector,
+ c_bit bit(4),
+ c_int4range int4range,
+ c_reltime reltime,
+ c_abstime abstime,
+ c_point point,
+ c_lseg lseg,
+ c_box box,
+ c_circle circle,
+ c_bitvarying bit varying(32),
+ c_cidr cidr,
+ c_inet inet,
+ c_macaddr macaddr,
+ c_hll hll(14,10,12,0),
+ PRIMARY KEY ( order_id )
+ );
+ </create-table-order>
+
+ <full-insert-order>
+ INSERT INTO test.t_order (
+ order_id, user_id, status, c_int, c_smallint, c_float, c_double, c_numeric, c_boolean, c_char, c_text, c_bytea, c_date, c_time,
+ c_smalldatetime, c_timestamp, c_timestamptz, c_interval, c_array, c_json, c_jsonb, c_uuid, c_hash32, c_tsvector, c_bit,
+ c_int4range, c_reltime, c_abstime, c_point, c_lseg, c_box, c_circle, c_bitvarying, c_cidr, c_inet, c_macaddr, c_hll
+ ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
+ </full-insert-order>
+
+ <create-table-order-item>
+ CREATE TABLE test.t_order_item (
+ item_id int8 NOT NULL,
+ order_id int8 NOT NULL,
+ user_id int4 NOT NULL,
+ status varchar(50),
+ PRIMARY KEY (item_id)
+ )
+ </create-table-order-item>
+
+ <full-insert-order-item>
+ INSERT INTO test.t_order_item(item_id,order_id,user_id,status) VALUES(?,?,?,?)
+ </full-insert-order-item>
+</command>