You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/02/24 02:02:43 UTC

[shardingsphere] branch master updated: Add PostgreSQL and openGauss estimated count SQL (#24321)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f7c8c9786 Add PostgreSQL and openGauss estimated count SQL  (#24321)
32f7c8c9786 is described below

commit 32f7c8c9786068484a365a20ca69104a4e6d568c
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Feb 24 10:02:35 2023 +0800

    Add PostgreSQL and openGauss estimated count SQL  (#24321)
    
    * Add PostgreSQL and openGauss estimated count sql
    
    * Fix build insert sql without unique keys of PostgresSQL
    
    * Improve show consistency check remaining_seconds
    
    * Throw unsupported exception when unique key is null at data match check
    
    * Add E2E test case
    
    * Improve inventory percentage of show job status
---
 .../impl/AbstractInventoryIncrementalJobAPIImpl.java  |  4 +++-
 .../DataMatchDataConsistencyCalculateAlgorithm.java   |  3 +++
 .../pipeline/core/prepare/InventoryTaskSplitter.java  | 13 +++++++++----
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java       |  4 ++--
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java      | 11 ++++++++---
 .../api/impl/ConsistencyCheckJobAPI.java              |  2 +-
 .../migration/primarykey/IndexesMigrationE2EIT.java   | 19 ++++++++++++++-----
 7 files changed, 40 insertions(+), 16 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
index c95f79f5357..fa694640d81 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -119,7 +119,9 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
                 continue;
             }
             int inventoryFinishedPercentage = 0;
-            if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
+            if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemProgress.getStatus()) {
+                inventoryFinishedPercentage = 100;
+            } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != jobItemProgress.getInventoryRecordsCount()) {
                 inventoryFinishedPercentage = (int) Math.min(100, jobItemProgress.getProcessedRecordsCount() * 100 / jobItemProgress.getInventoryRecordsCount());
             }
             String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 482beb4b306..14a6a4ad919 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -174,6 +174,9 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
     }
     
     private String getQuerySQL(final DataConsistencyCalculateParameter param) {
+        if (null == param.getUniqueKey()) {
+            throw new UnsupportedOperationException("Data consistency of DATA_MATCH type not support table without unique key and primary key now");
+        }
         PipelineSQLBuilder sqlBuilder = PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, param.getDatabaseType());
         String logicTableName = param.getLogicTableName();
         String schemaName = param.getSchemaName();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 84aec4aa43d..ecd85d92b3d 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -44,6 +44,9 @@ import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChanne
 import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -176,7 +179,8 @@ public final class InventoryTaskSplitter {
         Optional<String> sql = pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
         try {
             if (sql.isPresent()) {
-                long result = getEstimatedCount(dataSource, sql.get());
+                DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, jobConfig.getSourceDatabaseType());
+                long result = getEstimatedCount(databaseType, dataSource, sql.get());
                 return result > 0 ? result : getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
             }
             return getCount(dataSource, pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
@@ -186,12 +190,13 @@ public final class InventoryTaskSplitter {
         }
     }
     
-    // TODO maybe need refactor after PostgreSQL support estimated count.
-    private long getEstimatedCount(final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
+    private long getEstimatedCount(final DatabaseType databaseType, final DataSource dataSource, final String estimatedCountSQL) throws SQLException {
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = connection.prepareStatement(estimatedCountSQL)) {
-            preparedStatement.setString(1, connection.getCatalog());
+            if (databaseType instanceof MySQLDatabaseType) {
+                preparedStatement.setString(1, connection.getCatalog());
+            }
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 resultSet.next();
                 return resultSet.getLong(1);
diff --git a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 332b89b3ea8..536d4824d01 100644
--- a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -89,8 +89,8 @@ public final class OpenGaussPipelineSQLBuilder extends AbstractPipelineSQLBuilde
     
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, final String tableName) {
-        // TODO Support estimated count later.
-        return Optional.empty();
+        String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
+        return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
     }
     
     @Override
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 3f9db6be99d..917463fa082 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -63,7 +63,12 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
     
     @Override
     public String buildInsertSQL(final String schemaName, final DataRecord dataRecord) {
-        return super.buildInsertSQL(schemaName, dataRecord) + buildConflictSQL(dataRecord);
+        String result = super.buildInsertSQL(schemaName, dataRecord);
+        // TODO without unique key, job has been interrupted, which may lead to data duplication
+        if (dataRecord.getUniqueKeyValue().isEmpty()) {
+            return result;
+        }
+        return result + buildConflictSQL(dataRecord);
     }
     
     // Refer to https://www.postgresql.org/docs/current/sql-insert.html
@@ -87,8 +92,8 @@ public final class PostgreSQLPipelineSQLBuilder extends AbstractPipelineSQLBuild
     
     @Override
     public Optional<String> buildEstimatedCountSQL(final String schemaName, final String tableName) {
-        // TODO Support estimated count later.
-        return Optional.empty();
+        String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
+        return Optional.of(String.format("SELECT reltuples::integer FROM pg_class WHERE oid='%s'::regclass::oid;", qualifiedTableName));
     }
     
     @Override
diff --git a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 6a506e6ca9a..b5221009179 100644
--- a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -297,7 +297,7 @@ public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
             if (null != stopTimeMillis) {
                 result.setCheckEndTime(DATE_TIME_FORMATTER.format(new Timestamp(stopTimeMillis).toLocalDateTime()));
             }
-            long remainingMills = (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis);
+            long remainingMills = Math.max(0, (long) ((recordsCount - checkedRecordsCount) * 1.0D / checkedRecordsCount * durationMillis));
             result.setRemainingSeconds(remainingMills / 1000);
         }
         String tableNames = jobItemProgress.getTableNames();
diff --git a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index af5498037be..a2e11733595 100644
--- a/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ b/test/e2e/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.database.type.dialect.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
@@ -70,11 +71,14 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         if (PipelineBaseE2EIT.ENV.getItEnvType() == PipelineEnvTypeEnum.NONE) {
             return result;
         }
-        List<String> versions = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
-        if (versions.isEmpty()) {
-            return result;
+        List<String> mysqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new MySQLDatabaseType());
+        if (!mysqlVersion.isEmpty()) {
+            result.add(new PipelineTestParameter(new MySQLDatabaseType(), mysqlVersion.get(0), "env/common/none.xml"));
+        }
+        List<String> postgresqlVersion = PipelineBaseE2EIT.ENV.listStorageContainerImages(new PostgreSQLDatabaseType());
+        if (!postgresqlVersion.isEmpty()) {
+            result.add(new PipelineTestParameter(new PostgreSQLDatabaseType(), postgresqlVersion.get(0), "env/common/none.xml"));
         }
-        result.add(new PipelineTestParameter(new MySQLDatabaseType(), versions.get(0), "env/common/none.xml"));
         return result;
     }
     
@@ -91,6 +95,9 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             sql = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT NULL, `user_id` INT NOT NULL, `status` varchar(255)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             // DATA_MATCH doesn't supported, could not order by records
             consistencyCheckAlgorithmType = "CRC32_MATCH";
+        } else if (getDatabaseType() instanceof PostgreSQLDatabaseType) {
+            sql = "CREATE TABLE %s (order_id varchar(255) NOT NULL,user_id int NOT NULL,status varchar(255) NULL)";
+            consistencyCheckAlgorithmType = null;
         } else {
             return;
         }
@@ -160,7 +167,9 @@ public final class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         }
         assertProxyOrderRecordExist("t_order", primaryKey);
         waitIncrementTaskFinished(String.format("SHOW MIGRATION STATUS '%s'", jobId));
-        assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+        if (null != consistencyCheckAlgorithmType) {
+            assertCheckMigrationSuccess(jobId, consistencyCheckAlgorithmType);
+        }
         commitMigrationByJobId(jobId);
         proxyExecuteWithLog("REFRESH TABLE METADATA", 1);
         assertThat(getTargetTableRecordsCount(getSourceTableOrderName()), is(PipelineBaseE2EIT.TABLE_INIT_ROW_COUNT + 1));