You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2023/04/21 11:46:00 UTC

[shardingsphere] branch master updated: Fix delete event before values is null at pipeline DataRecord (#25257)

This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3410736bc39 Fix delete event before values is null at pipeline DataRecord (#25257)
3410736bc39 is described below

commit 3410736bc395664b2a8c6bc1f3fc570b5b290d14
Author: Xinze Guo <10...@users.noreply.github.com>
AuthorDate: Fri Apr 21 19:45:44 2023 +0800

    Fix delete event before values is null at pipeline DataRecord (#25257)
    
    * Fix delete event before values is null at pipeline DataRecord
    
    * Revert "Fix delete event before values is null at pipeline DataRecord"
    
    This reverts commit cd5506a1d0461e1c299ec59c21e783a709b6080b.
    
    * Fix delete event before value is null
    
    * Fix ci error
    
    * Fix DataSourceImporterTest unit test
---
 .../pipeline/core/importer/DataRecordMerger.java   |  4 +-
 .../pipeline/core/importer/DataSourceImporter.java |  2 +-
 .../core/importer/DataRecordMergerTest.java        |  6 +--
 .../mysql/ingest/MySQLIncrementalDumper.java       |  2 +-
 .../postgresql/ingest/wal/WALEventConverter.java   |  2 +-
 .../cases/cdc/DataSourceRecordConsumer.java        |  5 ++-
 .../core/importer/DataSourceImporterTest.java      | 43 +++++++++++++++-------
 7 files changed, 41 insertions(+), 23 deletions(-)

diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
index 2aec38c6264..9b7ba2663e4 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMerger.java
@@ -119,7 +119,7 @@ public final class DataRecordMerger {
     }
     
     private void mergeDelete(final DataRecord dataRecord, final Map<DataRecord.Key, DataRecord> dataRecords) {
-        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getKey());
+        DataRecord beforeDataRecord = dataRecords.get(dataRecord.getOldKey());
         ShardingSpherePreconditions.checkState(null == beforeDataRecord
                 || !IngestDataChangeType.DELETE.equals(beforeDataRecord.getType()), () -> new PipelineUnexpectedDataRecordOrderException(beforeDataRecord, dataRecord));
         if (null != beforeDataRecord && IngestDataChangeType.UPDATE.equals(beforeDataRecord.getType()) && checkUpdatedPrimaryKey(beforeDataRecord)) {
@@ -133,7 +133,7 @@ public final class DataRecordMerger {
             dataRecords.remove(beforeDataRecord.getKey());
             dataRecords.put(mergedDataRecord.getKey(), mergedDataRecord);
         } else {
-            dataRecords.put(dataRecord.getKey(), dataRecord);
+            dataRecords.put(dataRecord.getOldKey(), dataRecord);
         }
     }
     
diff --git a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
index 9cac9104192..0c0e20b78a9 100644
--- a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
+++ b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporter.java
@@ -242,7 +242,7 @@ public final class DataSourceImporter extends AbstractLifecycleExecutor implemen
             for (DataRecord each : dataRecords) {
                 conditionColumns = RecordUtils.extractConditionColumns(each, importerConfig.getShardingColumns(each.getTableName()));
                 for (int i = 0; i < conditionColumns.size(); i++) {
-                    preparedStatement.setObject(i + 1, conditionColumns.get(i).getValue());
+                    preparedStatement.setObject(i + 1, conditionColumns.get(i).getOldValue());
                 }
                 preparedStatement.addBatch();
             }
diff --git a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
index 208fab4060b..fa8c7c5ca33 100644
--- a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
+++ b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DataRecordMergerTest.java
@@ -274,9 +274,9 @@ class DataRecordMergerTest {
         DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
         result.setType(IngestDataChangeType.DELETE);
         result.setTableName(tableName);
-        result.addColumn(new Column("id", id, true, true));
-        result.addColumn(new Column("user_id", userId, true, false));
-        result.addColumn(new Column("total_price", totalPrice, true, false));
+        result.addColumn(new Column("id", id, null, true, true));
+        result.addColumn(new Column("user_id", userId, null, true, false));
+        result.addColumn(new Column("total_price", totalPrice, null, true, false));
         return result;
     }
 }
diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 0f1cb52891a..7a511f419d8 100644
--- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -188,7 +188,7 @@ public final class MySQLIncrementalDumper extends AbstractLifecycleExecutor impl
                 if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
                     continue;
                 }
-                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
+                record.addColumn(new Column(columnMetaData.getName(), handleValue(columnMetaData, each[i]), null, true, columnMetaData.isUniqueKey()));
             }
             channel.pushRecord(record);
         }
diff --git a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index de8127d40df..c9be2e60f2f 100644
--- a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -117,7 +117,7 @@ public final class WALEventConverter {
         // TODO Unique key may be a column within unique index
         List<String> primaryKeyColumns = tableMetaData.getPrimaryKeyColumns();
         for (int i = 0; i < event.getPrimaryKeys().size(); i++) {
-            result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), true, true));
+            result.addColumn(new Column(primaryKeyColumns.get(i), event.getPrimaryKeys().get(i), null, true, true));
         }
         return result;
     }
diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
index f84e16327f8..c21aa60fcd3 100644
--- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -125,8 +125,9 @@ public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
                     }
                     break;
                 case DELETE:
-                    Object orderId = convertValueFromAny(tableMetaData, afterMap.get("order_id"));
-                    preparedStatement.setObject(1, orderId);
+                    TableColumn orderId = record.getBeforeList().stream().filter(each -> "order_id".equals(each.getName())).findFirst().orElseThrow(() ->
+                            new UnsupportedOperationException("No primary key found in the t_order"));
+                    preparedStatement.setObject(1, convertValueFromAny(tableMetaData, orderId));
                     preparedStatement.execute();
                     break;
                 default:
diff --git a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
index 4db635f2e62..1620b0be8b0 100644
--- a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
+++ b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/DataSourceImporterTest.java
@@ -32,7 +32,6 @@ import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.importer.DataSourceImporter;
 import org.apache.shardingsphere.data.pipeline.core.importer.connector.DataSourceImporterConnector;
-import org.apache.shardingsphere.data.pipeline.core.record.RecordUtils;
 import org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
 import org.junit.jupiter.api.BeforeEach;
@@ -45,7 +44,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -120,10 +118,10 @@ class DataSourceImporterTest {
         when(connection.prepareStatement(any())).thenReturn(preparedStatement);
         when(channel.fetchRecords(anyInt(), anyInt())).thenReturn(mockRecords(updateRecord));
         jdbcImporter.run();
-        verify(preparedStatement).setObject(1, 10);
+        verify(preparedStatement).setObject(1, 20);
         verify(preparedStatement).setObject(2, "UPDATE");
         verify(preparedStatement).setObject(3, 1);
-        verify(preparedStatement).setObject(4, 10);
+        verify(preparedStatement).setObject(4, 20);
         verify(preparedStatement).executeUpdate();
     }
     
@@ -147,15 +145,11 @@ class DataSourceImporterTest {
         result.setTableName(TABLE_NAME);
         result.setType("UPDATE");
         result.addColumn(new Column("id", 1, 2, true, true));
-        result.addColumn(new Column("user", 10, true, false));
-        result.addColumn(new Column("status", "UPDATE", true, false));
+        result.addColumn(new Column("user", 0, 10, true, false));
+        result.addColumn(new Column("status", null, "UPDATE", true, false));
         return result;
     }
     
-    private Collection<Column> mockConditionColumns(final DataRecord dataRecord) {
-        return RecordUtils.extractConditionColumns(dataRecord, Collections.singleton("user"));
-    }
-    
     private List<Record> mockRecords(final DataRecord dataRecord) {
         List<Record> result = new LinkedList<>();
         result.add(dataRecord);
@@ -167,9 +161,32 @@ class DataSourceImporterTest {
         DataRecord result = new DataRecord(new PlaceholderPosition(), 3);
         result.setTableName(TABLE_NAME);
         result.setType(recordType);
-        result.addColumn(new Column("id", 1, false, true));
-        result.addColumn(new Column("user", 10, true, false));
-        result.addColumn(new Column("status", recordType, true, false));
+        Integer idOldValue = null;
+        Integer userOldValue = null;
+        Integer idValue = null;
+        Integer userValue = null;
+        String statusOldValue = null;
+        String statusValue = null;
+        if ("INSERT".equals(recordType)) {
+            idValue = 1;
+            userValue = 10;
+            statusValue = recordType;
+        }
+        if ("UPDATE".equals(recordType)) {
+            idOldValue = 1;
+            idValue = idOldValue;
+            userOldValue = 10;
+            userValue = 20;
+            statusValue = recordType;
+        }
+        if ("DELETE".equals(recordType)) {
+            idOldValue = 1;
+            userOldValue = 10;
+            statusOldValue = recordType;
+        }
+        result.addColumn(new Column("id", idOldValue, idValue, false, true));
+        result.addColumn(new Column("user", userOldValue, userValue, true, false));
+        result.addColumn(new Column("status", statusOldValue, statusValue, true, false));
         return result;
     }