You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/21 11:46:00 UTC
[shardingsphere] branch master updated: Fix delete event before values is null at pipeline DataRecord (#25257)
This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3410736bc39 Fix delete event before values is null at pipeline DataRecord (#25257)
3410736bc39 is described below
commit 3410736bc395664b2a8c6bc1f3fc570b5b290d14
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Apr 21 19:45:44 2023 +0800
Fix delete event before values is null at pipeline DataRecord (#25257)
* Fix delete event before values is null at pipeline DataRecord
* Revert "Fix delete event before values is null at pipeline DataRecord"
This reverts commit cd5506a1d0461e1c299ec59c21e783a709b6080b.
* Fix delete event before value is null
* Fix ci error
* Fix DataSourceImporterTest unit test
---
.../pipeline/core/importer/DataRecordMerger.java | 4 +-
.../pipeline/core/importer/DataSourceImporter.java | 2 +-
.../core/importer/DataRecordMergerTest.java | 6 +--
.../mysql/ingest/MySQLIncrementalDumper.java | 2 +-
.../postgresql/ingest/wal/WALEventConverter.java | 2 +-
.../cases/cdc/DataSourceRecordConsumer.java | 5 ++-
.../core/importer/DataSourceImporterTest.java | 43 +++++++++++++++-------
7 files changed, 41 insertions(+), 23 deletions(-)
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
index 2aec38c6264..9b7ba2663e4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
@@ -119,7 +119,7 @@ public final class DataRecordMerger {
}
private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
- DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+ DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
ShardingSpherePreconditions.checkState(null == beforeDataRecord
|| !IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
if (null != beforeDataRecord && IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType()) && checkUpdatedPrimaryKey(beforeDataRecord)) {
@@ -133,7 +133,7 @@ public final class DataRecordMerger {
dataRecords.remove(beforeDataRecord.getKey());
dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
} else {
- dataRecords.put(dataRecord.getKey(), dataRecord);
+ dataRecords.put(dataRecord.getOldKey(), dataRecord);
}
}
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 9cac9104192..0c0e20b78a9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -242,7 +242,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
for (DataRecord each : dataRecords) {
conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
for (int i = 0; i < conditionColumns.size(); i++) {
- preparedStatement.setObject(i + 1, conditionColumns.get(i).getValue());
+ preparedStatement.setObject(i + 1, conditionColumns.get(i).getOldValue());
}
preparedStatement.addBatch();
}
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
index 208fab4060b..fa8c7c5ca33 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
@@ -274,9 +274,9 @@ class DataRecordMergerTest {
DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
result.setType(IngestDataChangeType.DELETE);
result.setTableName(tableName);
- result.addColumn(new Column("id", id, true, true));
- result.addColumn(new Column("user_id", userId, true, false));
- result.addColumn(new Column("total_price", totalPrice, true, false));
+ result.addColumn(new Column("id", id, null, true, true));
+ result.addColumn(new Column("user_id", userId, null, true, false));
+ result.addColumn(new Column("total_price", totalPrice, null, true, false));
return result;
}
}
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 0f1cb52891a..7a511f419d8 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -188,7 +188,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
continue;
}
- record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
+ record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), null, true, columnMetaData.isUniqueKey()));
}
channel.pushRecord(record);
}
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index de8127d40df..c9be2e60f2f 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -117,7 +117,7 @@ public final class WALEventConverter {
// TODO Unique key may be a column within unique index
List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
- result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), true, true));
+ result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), null, true, true));
}
return result;
}
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index f84e16327f8..c21aa60fcd3 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -125,8 +125,9 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
}
break;
case DELETE:
- Object orderId = convertValueFromAny(tableMetaData, afterMap.get("order_id"));
- preparedStatement.setObject(1, orderId);
+ TableColumn orderId = record.getBeforeList().stream().filter(each -> "order_id".equals(each.getName())).findFirst().orElseThrow(() ->
+ new UnsupportedOperationException("No primary key found in the t_order"));
+ preparedStatement.setObject(1, convertValueFromAny(tableMetaData, orderId));
preparedStatement.execute();
break;
default:
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index 4db635f2e62..1620b0be8b0 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -32,7 +32,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.importer.DataSourceImporter;
import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.junit.jupiter.api.BeforeEach;
@@ -45,7 +44,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -120,10 +118,10 @@ class DataSourceImporterTest {
when(connection.prepareStatement(any())).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
- verify(preparedStatement).setObject(1, 10);
+ verify(preparedStatement).setObject(1, 20);
verify(preparedStatement).setObject(2, "UPDATE");
verify(preparedStatement).setObject(3, 1);
- verify(preparedStatement).setObject(4, 10);
+ verify(preparedStatement).setObject(4, 20);
verify(preparedStatement).executeUpdate();
}
@@ -147,15 +145,11 @@ class DataSourceImporterTest {
result.setTableName(TABLE_NAME);
result.setType("UPDATE");
result.addColumn(new Column("id", 1, 2, true, true));
- result.addColumn(new Column("user", 10, true, false));
- result.addColumn(new Column("status", "UPDATE", true, false));
+ result.addColumn(new Column("user", 0, 10, true, false));
+ result.addColumn(new Column("status", null, "UPDATE", true, false));
return result;
}
- private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
- return RecordUtils.extractConditionColumns(dataRecord, Collections.singleton("user"));
- }
-
private List<Record> mockRecords(final DataRecord dataRecord) {
List<Record> result = new LinkedList<>();
result.add(dataRecord);
@@ -167,9 +161,32 @@ class DataSourceImporterTest {
DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
result.setTableName(TABLE_NAME);
result.setType(recordType);
- result.addColumn(new Column("id", 1, false, true));
- result.addColumn(new Column("user", 10, true, false));
- result.addColumn(new Column("status", recordType, true, false));
+ Integer idOldValue = null;
+ Integer userOldValue = null;
+ Integer idValue = null;
+ Integer userValue = null;
+ String statusOldValue = null;
+ String statusValue = null;
+ if ("INSERT".equals(recordType)) {
+ idValue = 1;
+ userValue = 10;
+ statusValue = recordType;
+ }
+ if ("UPDATE".equals(recordType)) {
+ idOldValue = 1;
+ idValue = idOldValue;
+ userOldValue = 10;
+ userValue = 20;
+ statusValue = recordType;
+ }
+ if ("DELETE".equals(recordType)) {
+ idOldValue = 1;
+ userOldValue = 10;
+ statusOldValue = recordType;
+ }
+ result.addColumn(new Column("id", idOldValue, idValue, false, true));
+ result.addColumn(new Column("user", userOldValue, userValue, true, false));
+ result.addColumn(new Column("status", statusOldValue, statusValue, true, false));
return result;
}