You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/08 06:45:41 UTC

[shardingsphere] branch master updated: Improve column value reader for DATA_MATCH algorithm (#21993)

This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a705efdee8 Improve column value reader for DATA_MATCH algorithm (#21993)
2a705efdee8 is described below

commit 2a705efdee8abac22eb833d30b65dc95a9cae4d5
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Nov 8 14:45:30 2022 +0800

    Improve column value reader for DATA_MATCH algorithm (#21993)
    
    * Improve column value reader for DATA_MATCH algorithm
    
    * Fix PostgreSQL table schema
    
    * Use getFloat for FLOAT
    
    * Compatible with schema not exists on dropping table
---
 .../type/DropTableStatementSchemaRefresher.java    |  6 ++-
 ...DataMatchDataConsistencyCalculateAlgorithm.java | 21 ++--------
 .../core/ingest/dumper/BasicColumnValueReader.java | 46 +++++++++++++++++++++-
 .../ingest/PostgreSQLColumnValueReader.java        | 10 +++--
 .../resources/env/scenario/general/postgresql.xml  |  4 +-
 5 files changed, 60 insertions(+), 27 deletions(-)

diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java
index 1aec0b4eed6..8f2f02f2c03 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.context.refresher.type;
 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
 import org.apache.shardingsphere.infra.context.refresher.MetaDataRefresher;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.database.schema.event.MetaDataRefreshedEvent;
 import org.apache.shardingsphere.infra.metadata.database.schema.event.SchemaAlteredEvent;
 import org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
@@ -39,7 +40,10 @@ public final class DropTableStatementSchemaRefresher implements MetaDataRefreshe
                                                     final String schemaName, final DropTableStatement sqlStatement, final ConfigurationProperties props) {
         SchemaAlteredEvent event = new SchemaAlteredEvent(database.getName(), schemaName);
         sqlStatement.getTables().forEach(each -> {
-            database.getSchema(schemaName).removeTable(each.getTableName().getIdentifier().getValue());
+            ShardingSphereSchema schema = database.getSchema(schemaName);
+            if (null != schema) {
+                schema.removeTable(each.getTableName().getIdentifier().getValue());
+            }
             event.getDroppedTables().add(each.getTableName().getIdentifier().getValue());
         });
         Collection<MutableDataNodeRule> rules = database.getRuleMetaData().findRules(MutableDataNodeRule.class);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 3bd1b0e7d0e..3538035596f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -46,7 +46,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
@@ -197,7 +196,9 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                 }
                 Iterator<Object> thisNextIterator = thisNext.iterator();
                 Iterator<Object> thatNextIterator = thatNext.iterator();
+                int columnIndex = 0;
                 while (thisNextIterator.hasNext() && thatNextIterator.hasNext()) {
+                    ++columnIndex;
                     Object thisResult = thisNextIterator.next();
                     Object thatResult = thatNextIterator.next();
                     boolean matched;
@@ -205,14 +206,11 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
                         matched = ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
                     } else if (thisResult instanceof BigDecimal && thatResult instanceof BigDecimal) {
                         matched = DataConsistencyCheckUtils.isBigDecimalEquals((BigDecimal) thisResult, (BigDecimal) thatResult);
-                    } else if (thisResult instanceof Number && thatResult instanceof Number && thisResult.getClass() != thatResult.getClass()) {
-                        // TODO some numeric types, Proxy and use jdbc to get different values, eg, PostgreSQL int2, MySQL unsigned mediumint
-                        matched = checkDifferentNumberTypeMatched((Number) thisResult, (Number) thatResult);
                     } else {
                         matched = new EqualsBuilder().append(thisResult, thatResult).isEquals();
                     }
                     if (!matched) {
-                        log.warn("record column value not match, value1={}, value2={}, value1.class={}, value2.class={}, record1={}, record2={}", thisResult, thatResult,
+                        log.warn("record column value not match, columnIndex={}, value1={}, value2={}, value1.class={}, value2.class={}, record1={}, record2={}", columnIndex, thisResult, thatResult,
                                 null != thisResult ? thisResult.getClass().getName() : "", null != thatResult ? thatResult.getClass().getName() : "",
                                 thisNext, thatNext);
                         return false;
@@ -222,19 +220,6 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
             return true;
         }
         
-        private boolean checkDifferentNumberTypeMatched(final Number thisResult, final Number thatResult) {
-            if (thisResult instanceof Integer) {
-                return thisResult.intValue() == thatResult.intValue();
-            }
-            if (thisResult instanceof Long) {
-                return thisResult.longValue() == thatResult.longValue();
-            }
-            if (thisResult instanceof Short) {
-                return thisResult.longValue() == thatResult.longValue();
-            }
-            return Objects.equals(thisResult, thatResult);
-        }
-        
         @Override
         public int hashCode() {
             return new HashCodeBuilder(17, 37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
index 735e0d245c7..90af92d8b51 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
@@ -30,15 +30,57 @@ import java.sql.Types;
 public class BasicColumnValueReader implements ColumnValueReader {
     
     @Override
-    public Object readValue(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final int columnIndex) throws SQLException {
-        int columnType = resultSet.getMetaData().getColumnType(columnIndex);
+    public Object readValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+        Object result = readValue0(resultSet, metaData, columnIndex);
+        return resultSet.wasNull() ? null : result;
+    }
+    
+    private Object readValue0(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+        int columnType = metaData.getColumnType(columnIndex);
         switch (columnType) {
+            case Types.BOOLEAN:
+                return resultSet.getBoolean(columnIndex);
+            case Types.TINYINT:
+                return metaData.isSigned(columnIndex) ? resultSet.getByte(columnIndex) : resultSet.getShort(columnIndex);
+            case Types.SMALLINT:
+                return metaData.isSigned(columnIndex) ? resultSet.getShort(columnIndex) : resultSet.getInt(columnIndex);
+            case Types.INTEGER:
+                return metaData.isSigned(columnIndex) ? resultSet.getInt(columnIndex) : resultSet.getLong(columnIndex);
+            case Types.BIGINT:
+                return metaData.isSigned(columnIndex) ? resultSet.getLong(columnIndex) : resultSet.getBigDecimal(columnIndex);
+            case Types.NUMERIC:
+            case Types.DECIMAL:
+                return resultSet.getBigDecimal(columnIndex);
+            case Types.REAL:
+            case Types.FLOAT:
+                return resultSet.getFloat(columnIndex);
+            case Types.DOUBLE:
+                return resultSet.getDouble(columnIndex);
             case Types.TIME:
                 return resultSet.getTime(columnIndex);
             case Types.DATE:
                 return resultSet.getDate(columnIndex);
             case Types.TIMESTAMP:
                 return resultSet.getTimestamp(columnIndex);
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.NCHAR:
+            case Types.NVARCHAR:
+            case Types.LONGNVARCHAR:
+                return resultSet.getString(columnIndex);
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+                return resultSet.getBytes(columnIndex);
+            case Types.CLOB:
+                return resultSet.getClob(columnIndex);
+            case Types.NCLOB:
+                return resultSet.getNClob(columnIndex);
+            case Types.BLOB:
+                return resultSet.getBlob(columnIndex);
+            case Types.ARRAY:
+                return resultSet.getArray(columnIndex);
             default:
                 return resultSet.getObject(columnIndex);
         }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
index 2bdd5b5bd7f..4cded45e0a6 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.BasicColumnValueReader;
 import org.postgresql.util.PGobject;
 
+import java.math.BigDecimal;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -41,14 +42,15 @@ public final class PostgreSQLColumnValueReader extends BasicColumnValueReader {
     @Override
     public Object readValue(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final int columnIndex) throws SQLException {
         if (isPgMoneyType(resultSetMetaData, columnIndex)) {
-            return resultSet.getBigDecimal(columnIndex);
+            BigDecimal result = resultSet.getBigDecimal(columnIndex);
+            return resultSet.wasNull() ? null : result;
         }
         if (isPgBitType(resultSetMetaData, columnIndex)) {
             PGobject result = new PGobject();
             result.setType("bit");
-            Object resultSetObject = resultSet.getObject(columnIndex);
-            result.setValue(null == resultSetObject ? null : (Boolean) resultSetObject ? "1" : "0");
-            return result;
+            Object bitValue = resultSet.getObject(columnIndex);
+            result.setValue(null == bitValue ? null : (Boolean) bitValue ? "1" : "0");
+            return resultSet.wasNull() ? null : result;
         }
         return super.readValue(resultSet, resultSetMetaData, columnIndex);
     }
diff --git a/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml b/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml
index 0d99e0c15a5..175c963912e 100644
--- a/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml
+++ b/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml
@@ -26,8 +26,8 @@
         t_bytea bytea NULL,
         t_char char(10) NULL,
         t_varchar varchar(128) NULL,
-        t_float float8 NULL,
-        t_double float4 NULL,
+        t_float float4 NULL,
+        t_double float8 NULL,
         t_json json NULL,
         t_jsonb jsonb NULL,
         t_text TEXT NULL,