You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/11/08 06:45:41 UTC
[shardingsphere] branch master updated: Improve column value reader for DATA_MATCH algorithm (#21993)
This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2a705efdee8 Improve column value reader for DATA_MATCH algorithm (#21993)
2a705efdee8 is described below
commit 2a705efdee8abac22eb833d30b65dc95a9cae4d5
Author: Hongsheng Zhong <zh...@apache.org>
AuthorDate: Tue Nov 8 14:45:30 2022 +0800
Improve column value reader for DATA_MATCH algorithm (#21993)
* Improve column value reader for DATA_MATCH algorithm
* Fix PostgreSQL table schema
* Use getFloat for FLOAT
* Compatible with schema not exists on dropping table
---
.../type/DropTableStatementSchemaRefresher.java | 6 ++-
...DataMatchDataConsistencyCalculateAlgorithm.java | 21 ++--------
.../core/ingest/dumper/BasicColumnValueReader.java | 46 +++++++++++++++++++++-
.../ingest/PostgreSQLColumnValueReader.java | 10 +++--
.../resources/env/scenario/general/postgresql.xml | 4 +-
5 files changed, 60 insertions(+), 27 deletions(-)
diff --git a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java
index 1aec0b4eed6..8f2f02f2c03 100644
--- a/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java
+++ b/infra/context/src/main/java/org/apache/shardingsphere/infra/context/refresher/type/DropTableStatementSchemaRefresher.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.infra.context.refresher.type;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.context.refresher.MetaDataRefresher;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.database.schema.event.MetaDataRefreshedEvent;
import org.apache.shardingsphere.infra.metadata.database.schema.event.SchemaAlteredEvent;
import org.apache.shardingsphere.infra.rule.identifier.type.MutableDataNodeRule;
@@ -39,7 +40,10 @@ public final class DropTableStatementSchemaRefresher implements MetaDataRefreshe
final String schemaName, final DropTableStatement sqlStatement, final ConfigurationProperties props) {
SchemaAlteredEvent event = new SchemaAlteredEvent(database.getName(), schemaName);
sqlStatement.getTables().forEach(each -> {
- database.getSchema(schemaName).removeTable(each.getTableName().getIdentifier().getValue());
+ ShardingSphereSchema schema = database.getSchema(schemaName);
+ if (null != schema) {
+ schema.removeTable(each.getTableName().getIdentifier().getValue());
+ }
event.getDroppedTables().add(each.getTableName().getIdentifier().getValue());
});
Collection<MutableDataNodeRule> rules = database.getRuleMetaData().findRules(MutableDataNodeRule.class);
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 3bd1b0e7d0e..3538035596f 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -46,7 +46,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -197,7 +196,9 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
}
Iterator<Object> thisNextIterator = thisNext.iterator();
Iterator<Object> thatNextIterator = thatNext.iterator();
+ int columnIndex = 0;
while (thisNextIterator.hasNext() && thatNextIterator.hasNext()) {
+ ++columnIndex;
Object thisResult = thisNextIterator.next();
Object thatResult = thatNextIterator.next();
boolean matched;
@@ -205,14 +206,11 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
matched = ((SQLXML) thisResult).getString().equals(((SQLXML) thatResult).getString());
} else if (thisResult instanceof BigDecimal && thatResult instanceof BigDecimal) {
matched = DataConsistencyCheckUtils.isBigDecimalEquals((BigDecimal) thisResult, (BigDecimal) thatResult);
- } else if (thisResult instanceof Number && thatResult instanceof Number && thisResult.getClass() != thatResult.getClass()) {
- // TODO some numeric types, Proxy and use jdbc to get different values, eg, PostgreSQL int2, MySQL unsigned mediumint
- matched = checkDifferentNumberTypeMatched((Number) thisResult, (Number) thatResult);
} else {
matched = new EqualsBuilder().append(thisResult, thatResult).isEquals();
}
if (!matched) {
- log.warn("record column value not match, value1={}, value2={}, value1.class={}, value2.class={}, record1={}, record2={}", thisResult, thatResult,
+ log.warn("record column value not match, columnIndex={}, value1={}, value2={}, value1.class={}, value2.class={}, record1={}, record2={}", columnIndex, thisResult, thatResult,
null != thisResult ? thisResult.getClass().getName() : "", null != thatResult ? thatResult.getClass().getName() : "",
thisNext, thatNext);
return false;
@@ -222,19 +220,6 @@ public final class DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
return true;
}
- private boolean checkDifferentNumberTypeMatched(final Number thisResult, final Number thatResult) {
- if (thisResult instanceof Integer) {
- return thisResult.intValue() == thatResult.intValue();
- }
- if (thisResult instanceof Long) {
- return thisResult.longValue() == thatResult.longValue();
- }
- if (thisResult instanceof Short) {
- return thisResult.longValue() == thatResult.longValue();
- }
- return Objects.equals(thisResult, thatResult);
- }
-
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37).append(getMaxUniqueKeyValue()).append(getRecordsCount()).append(getRecords()).toHashCode();
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
index 735e0d245c7..90af92d8b51 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/BasicColumnValueReader.java
@@ -30,15 +30,57 @@ import java.sql.Types;
public class BasicColumnValueReader implements ColumnValueReader {
@Override
- public Object readValue(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final int columnIndex) throws SQLException {
- int columnType = resultSet.getMetaData().getColumnType(columnIndex);
+ public Object readValue(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+ Object result = readValue0(resultSet, metaData, columnIndex);
+ return resultSet.wasNull() ? null : result;
+ }
+
+ private Object readValue0(final ResultSet resultSet, final ResultSetMetaData metaData, final int columnIndex) throws SQLException {
+ int columnType = metaData.getColumnType(columnIndex);
switch (columnType) {
+ case Types.BOOLEAN:
+ return resultSet.getBoolean(columnIndex);
+ case Types.TINYINT:
+ return metaData.isSigned(columnIndex) ? resultSet.getByte(columnIndex) : resultSet.getShort(columnIndex);
+ case Types.SMALLINT:
+ return metaData.isSigned(columnIndex) ? resultSet.getShort(columnIndex) : resultSet.getInt(columnIndex);
+ case Types.INTEGER:
+ return metaData.isSigned(columnIndex) ? resultSet.getInt(columnIndex) : resultSet.getLong(columnIndex);
+ case Types.BIGINT:
+ return metaData.isSigned(columnIndex) ? resultSet.getLong(columnIndex) : resultSet.getBigDecimal(columnIndex);
+ case Types.NUMERIC:
+ case Types.DECIMAL:
+ return resultSet.getBigDecimal(columnIndex);
+ case Types.REAL:
+ case Types.FLOAT:
+ return resultSet.getFloat(columnIndex);
+ case Types.DOUBLE:
+ return resultSet.getDouble(columnIndex);
case Types.TIME:
return resultSet.getTime(columnIndex);
case Types.DATE:
return resultSet.getDate(columnIndex);
case Types.TIMESTAMP:
return resultSet.getTimestamp(columnIndex);
+ case Types.CHAR:
+ case Types.VARCHAR:
+ case Types.LONGVARCHAR:
+ case Types.NCHAR:
+ case Types.NVARCHAR:
+ case Types.LONGNVARCHAR:
+ return resultSet.getString(columnIndex);
+ case Types.BINARY:
+ case Types.VARBINARY:
+ case Types.LONGVARBINARY:
+ return resultSet.getBytes(columnIndex);
+ case Types.CLOB:
+ return resultSet.getClob(columnIndex);
+ case Types.NCLOB:
+ return resultSet.getNClob(columnIndex);
+ case Types.BLOB:
+ return resultSet.getBlob(columnIndex);
+ case Types.ARRAY:
+ return resultSet.getArray(columnIndex);
default:
return resultSet.getObject(columnIndex);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
index 2bdd5b5bd7f..4cded45e0a6 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLColumnValueReader.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.BasicColumnValueReader;
import org.postgresql.util.PGobject;
+import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -41,14 +42,15 @@ public final class PostgreSQLColumnValueReader extends BasicColumnValueReader {
@Override
public Object readValue(final ResultSet resultSet, final ResultSetMetaData resultSetMetaData, final int columnIndex) throws SQLException {
if (isPgMoneyType(resultSetMetaData, columnIndex)) {
- return resultSet.getBigDecimal(columnIndex);
+ BigDecimal result = resultSet.getBigDecimal(columnIndex);
+ return resultSet.wasNull() ? null : result;
}
if (isPgBitType(resultSetMetaData, columnIndex)) {
PGobject result = new PGobject();
result.setType("bit");
- Object resultSetObject = resultSet.getObject(columnIndex);
- result.setValue(null == resultSetObject ? null : (Boolean) resultSetObject ? "1" : "0");
- return result;
+ Object bitValue = resultSet.getObject(columnIndex);
+ result.setValue(null == bitValue ? null : (Boolean) bitValue ? "1" : "0");
+ return resultSet.wasNull() ? null : result;
}
return super.readValue(resultSet, resultSetMetaData, columnIndex);
}
diff --git a/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml b/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml
index 0d99e0c15a5..175c963912e 100644
--- a/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml
+++ b/test/integration-test/scaling/src/test/resources/env/scenario/general/postgresql.xml
@@ -26,8 +26,8 @@
t_bytea bytea NULL,
t_char char(10) NULL,
t_varchar varchar(128) NULL,
- t_float float8 NULL,
- t_double float4 NULL,
+ t_float float4 NULL,
+ t_double float8 NULL,
t_json json NULL,
t_jsonb jsonb NULL,
t_text TEXT NULL,