You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/24 02:02:43 UTC
[shardingsphere] branch master updated: Add PostgreSQL and openGauss estimated count SQL (#24321)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 32f7c8c9786 Add PostgreSQL and openGauss estimated count SQL (#24321)
32f7c8c9786 is described below
commit 32f7c8c9786068484a365a20ca69104a4e6d568c
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Feb 24 10:02:35 2023 +0800
Add PostgreSQL and openGauss estimated count SQL (#24321)
* Add PostgreSQL and openGauss estimated count sql
* Fix build insert sql without unique keys of PostgresSQL
* Improve show consistency check remaining_seconds
* Throw unsupported exception when unique key is null at data match check
* Add E2E test case
* Improve inventory percentage of show job status
---
.../impl/AbstractInventoryIncrementalJobAPIImpl.java | 4 +++-
.../DataMatchDataConsistencyCalculateAlgorithm.java | 3 +++
.../pipeline/core/prepare/InventoryTaskSplitter.java | 13 +++++++++----
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 4 ++--
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 11 ++++++++---
.../api/impl/ConsistencyCheckJobAPI.java | 2 +-
.../migration/primarykey/IndexesMigrationE2EIT.java | 19 ++++++++++++++-----
7 files changed, 40 insertions(+), 16 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index c95f79f5357..fa694640d81 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -119,7 +119,9 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
continue;
}
int inventoryFinishedPercentage = 0;
- if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
+ if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemProgress.getStatus()) {
+ inventoryFinishedPercentage = 100;
+ } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
}
String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 482beb4b306..14a6a4ad919 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -174,6 +174,9 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
}
private String getQuerySQL(final DataConsistencyCalculateParameter param) {
+ if (null == param.getUniqueKey()) {
+ throw new UnsupportedOperationException("Data consistency of DATA_MATCH type not support table without unique key and primary key now");
+ }
PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
String logicTableName = param.getLogicTableName();
String schemaName = param.getSchemaName();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 84aec4aa43d..ecd85d92b3d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -44,6 +44,9 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -176,7 +179,8 @@ public final class InventoryTaskSplitter {
Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
try {
if (sql.isPresent()) {
- long result = getEstimatedCount(dataSource, sql.get());
+ DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, jobConfig.getSourceDatabaseType());
+ long result = getEstimatedCount(databaseType, dataSource, sql.get());
return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
}
return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
@@ -186,12 +190,13 @@ public final class InventoryTaskSplitter {
}
}
- // TODO maybe need refactor after PostgreSQL support estimated count.
- private long getEstimatedCount(final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
+ private long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
- preparedStatement.setString(1, connection.getCatalog());
+ if (databaseType instanceof MySQLDatabaseType) {
+ preparedStatement.setString(1, connection.getCatalog());
+ }
try (ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 332b89b3ea8..536d4824d01 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -89,8 +89,8 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName, final String tableName) {
- // TODO Support estimated count later.
- return Optional.empty();
+ String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
+ return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
}
@Override
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 3f9db6be99d..917463fa082 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -63,7 +63,12 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
@Override
public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
- return super.buildInsertSQL(schemaName, dataRecord) + buildConflictSQL(dataRecord);
+ String result = super.buildInsertSQL(schemaName, dataRecord);
+ // TODO without unique key, job has been interrupted, which may lead to data duplication
+ if (dataRecord.getUniqueKeyValue().isEmpty()) {
+ return result;
+ }
+ return result + buildConflictSQL(dataRecord);
}
// Refer to https://www.postgresql.org/docs/current/sql-insert.html
@@ -87,8 +92,8 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
@Override
public Optional<String> buildEstimatedCountSQL(final String schemaName, final String tableName) {
- // TODO Support estimated count later.
- return Optional.empty();
+ String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
+ return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
}
@Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 6a506e6ca9a..b5221009179 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -297,7 +297,7 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
if (null != stopTimeMillis) {
result.setCheckEndTime(DATE_TIME_FORMATTER.format(new Timestamp(stopTimeMillis).toLocalDateTime()));
}
- long remainingMills = (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis);
+ long remainingMills = Math.max(0, (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis));
result.setRemainingSeconds(remainingMills / 1000);
}
String tableNames = jobItemProgress.getTableNames();
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index af5498037be..a2e11733595 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
@@ -70,11 +71,14 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
return result;
}
- List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
- if (versions.isEmpty()) {
- return result;
+ List<String> mysqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+ if (!mysqlVersion.isEmpty()) {
+ result.add(new PipelineTestParameter(new MySQLDatabaseType(), mysqlVersion.get(0), "env/common/none.xml"));
+ }
+ List<String> postgresqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType());
+ if (!postgresqlVersion.isEmpty()) {
+ result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), postgresqlVersion.get(0), "env/common/none.xml"));
}
- result.add(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/common/none.xml"));
return result;
}
@@ -91,6 +95,9 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
// DATA_MATCH doesn't supported, could not order by records
consistencyCheckAlgorithmType = "CRC32_MATCH";
+ } else if (getDatabaseType() instanceof PostgreSQLDatabaseType) {
+ sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id int NOT NULL,status varchar(255) NULL)";
+ consistencyCheckAlgorithmType = null;
} else {
return;
}
@@ -160,7 +167,9 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
}
assertProxyOrderRecordExist("t_order", primaryKey);
waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
- assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+ if (null != consistencyCheckAlgorithmType) {
+ assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+ }
commitMigrationByJobId(jobId);
proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));