You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/03 04:19:31 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #4880: [HUDI-2752] The MOR DELETE block breaks the event time sequence of CDC

vinothchandar commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r817913665



##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkWriteHelper.java
##########
@@ -105,7 +105,7 @@ public static FlinkWriteHelper newInstance() {
       // we cannot allow the user to change the key or partitionPath, since that will affect
       // everything
       // so pick it from one of the records.
-      boolean choosePrev = data1.equals(reducedData);
+      boolean choosePrev = data1 == reducedData;

Review comment:
       hmmm. why this change

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -114,4 +114,16 @@ default T preCombine(T oldValue, Properties properties) {
   default Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+  /**
+   * This method can be used to extract the ordering value of the payload for combining/merging,
+   * or 0 if no value is specified which means natural order.

Review comment:
       At the user level, we expose this as a "precombineValue" in deltastremer and spark and flink. the API here is called `preCombine()`.  I am all for embracing event time and arrival time semantics, but, for now, to stay consistent with the current `RecordPayload` methods, may be call this `getCombineValue()` or something?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
##########
@@ -411,7 +411,7 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option<List<String>> ke
    *
    * @param key Deleted record key
    */
-  protected abstract void processNextDeletedKey(HoodieKey key);

Review comment:
       We could just rename DeleteKey -> DeleteRecord i.e data block persists HoodieRecord, delete block persists DeleteRecord, and have that wrap `HoodieKey`, instead of extending it?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order.
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable oldOrderingVal = oldRecord.getData().getOrderingVal();
+      Comparable newOrderingVal = deleteKey.getOrderingVal();

Review comment:
       IMO currentOrderingVal and deleteOrderingVal is clearer. Leave it to you.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
##########
@@ -86,7 +86,7 @@ public HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream inputStream,
         int dataLength = dis.readInt();
         byte[] data = new byte[dataLength];
         dis.readFully(data);
-        this.keysToDelete = SerializationUtils.<HoodieKey[]>deserialize(data);
+        this.keysToDelete = SerializationUtils.<DeleteKey[]>deserialize(data);

Review comment:
       that PR/JIRA would be a release blocker. and I suggest we introduce a flag here in this PR (an internal config) that turns off the new format. We can revert that in the follow on PR, this way we are insulated from missing it

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
##########
@@ -39,9 +39,9 @@
  */
 public class HoodieDeleteBlock extends HoodieLogBlock {
 
-  private HoodieKey[] keysToDelete;
+  private DeleteKey[] keysToDelete;

Review comment:
       I think we should introduce a new log block version. let me know if you need help breaking down steps to doing that. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -475,12 +475,12 @@ private void writeToBuffer(HoodieRecord<T> record) {
     }
     Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
     if (indexedRecord.isPresent()) {
-      // Skip the Ignore Record.
+      // Skip the ignored record.
       if (!indexedRecord.get().equals(IGNORE_RECORD)) {
         recordList.add(indexedRecord.get());
       }
     } else {
-      keysToDelete.add(record.getKey());
+      keysToDelete.add(DeleteKey.create(record.getKey(), record.getData().getOrderingVal()));
     }

Review comment:
       the log block supports versions. We need to create a new version of the HoodieDeleteBlock to handle both old and new encoding

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -516,17 +516,14 @@ class TestMORDataSource extends HoodieClientTestBase {
     checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 16, 97, true))
-    // Ordering value will not be honored for a delete record as the payload is sent as empty payload
-    checkAnswer((1, "a0", 16, 97, true))
+    // Ordering value will be honored, the delete record is considered as obsolete
+    // because it has smaller version number (97 < 101)
+    checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 18, 96, false))
-    // Ideally, once a record is deleted, preCombine does not kick. So, any new record will be considered valid ignoring
-    // ordering val. But what happens ini hudi is, all records in log files are reconciled and then merged with base

Review comment:
       @alexeykudinkin can you clarify the high level concern here? do you think the test is wrong (with the changes in the PR) or we need more cases or you unearthed some bug we still need to fix in the PR

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order.
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable oldOrderingVal = oldRecord.getData().getOrderingVal();
+      Comparable newOrderingVal = deleteKey.getOrderingVal();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean choosePrev = !newOrderingVal.equals(0)
+          && oldOrderingVal.getClass() == newOrderingVal.getClass()
+          && oldOrderingVal.compareTo(newOrderingVal) > 0;
+      if (choosePrev) {
+        // The DELETE message is obsolete if the old message has greater orderingVal.

Review comment:
       nit: consistent terminology. "if the old record has greater orderingVal"

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended after the data block,
+ * we need to keep the ordering val to combine with the data records when merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       
   
   +1 on providing concrete suggestions to move forward.
   
   @danny0405 The main contention here is that `DeleteKey` is not really a key. we need some cleanup where we pass the orderingVal from the engine API (flink level), all the way down the actual places where I/O happens. Also mandate orderingVal across all the payloads. I was hoping we would do that in the new merge API design. But that's getting delayed now. 
   
   
   
   
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order.
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable oldOrderingVal = oldRecord.getData().getOrderingVal();
+      Comparable newOrderingVal = deleteKey.getOrderingVal();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean choosePrev = !newOrderingVal.equals(0)
+          && oldOrderingVal.getClass() == newOrderingVal.getClass()
+          && oldOrderingVal.compareTo(newOrderingVal) > 0;
+      if (choosePrev) {

Review comment:
       nit: consistent terminolog. old vs Prev.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended after the data block,
+ * we need to keep the ordering val to combine with the data records when merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       I don't think using a different payload is the right approach here. For Flink CDC, how do we propagate the deletes? Can we check what we are doing here against existing CDC payloads like AWSDmsAvroPayload and the debezium payload (children of `AbstractDebeziumAvroPayload`) to see that the deletes will propagate correctly? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended after the data block,
+ * we need to keep the ordering val to combine with the data records when merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       FYI `EmptyHoodieRecordPayload` is a special case for deleting using Spark, without having to generate a dataframe with same schema as table. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org