You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/02 01:57:10 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4880: [HUDI-2752] The MOR DELETE block breaks the event time sequence of CDC

alexeykudinkin commented on a change in pull request #4880:
URL: https://github.com/apache/hudi/pull/4880#discussion_r817279089



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -114,4 +114,16 @@ default T preCombine(T oldValue, Properties properties) {
   default Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+  /**
+   * This method can be used to extract the ordering value of the payload for combining/merging,
+   * or 0 if no value is specified which means natural order.

Review comment:
       But what does "later" mean in that sense? Isn't `orderingVal` supposed to be ordering those?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended after the data block,
+ * we need to keep the ordering val to combine with the data records when merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * For purposes of preCombining.
+   */
+  private Comparable orderingVal;
+
+  private DeleteKey(String recordKey, String partitionPath, Comparable orderingVal) {
+    super(recordKey, partitionPath);
+    this.orderingVal = orderingVal;
+  }
+
+  public static DeleteKey create(HoodieKey hoodieKey, Comparable orderingVal) {
+    return create(hoodieKey.getRecordKey(), hoodieKey.getPartitionPath(), orderingVal);
+  }
+
+  public static DeleteKey create(String recordKey, String partitionPath) {
+    return create(recordKey, partitionPath, 0);

Review comment:
       This is not a version though, this is an `orderingVal` and this is very confusing -- does 0 mean that this should have highest priority?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
##########
@@ -153,9 +154,29 @@ protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoo
   }
 
   @Override
-  protected void processNextDeletedKey(HoodieKey hoodieKey) {
-    records.put(hoodieKey.getRecordKey(), SpillableMapUtils.generateEmptyPayload(hoodieKey.getRecordKey(),
-        hoodieKey.getPartitionPath(), getPayloadClassFQN()));
+  protected void processNextDeletedKey(DeleteKey deleteKey) {
+    String key = deleteKey.getRecordKey();
+    if (records.containsKey(key)) {
+      // Merge and store the merged record. The ordering val is taken to decide whether the same key record
+      // should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
+      // For same ordering values, uses the natural order.
+
+      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
+      Comparable oldOrderingVal = oldRecord.getData().getOrderingVal();
+      Comparable newOrderingVal = deleteKey.getOrderingVal();
+      // Checks the ordering value does not equal to 0
+      // because we use 0 as the default value which means natural order
+      boolean choosePrev = !newOrderingVal.equals(0)
+          && oldOrderingVal.getClass() == newOrderingVal.getClass()

Review comment:
       That default value then should be passed as long (otherwise `0 (int) != 0 (long)` which doesn't make sense)

##########
File path: hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
##########
@@ -516,17 +516,14 @@ class TestMORDataSource extends HoodieClientTestBase {
     checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 16, 97, true))
-    // Ordering value will not be honored for a delete record as the payload is sent as empty payload
-    checkAnswer((1, "a0", 16, 97, true))
+    // Ordering value will be honored, the delete record is considered as obsolete
+    // because it has smaller version number (97 < 101)
+    checkAnswer((1, "a0", 12, 101, false))
 
     writeData((1, "a0", 18, 96, false))
-    // Ideally, once a record is deleted, preCombine does not kick. So, any new record will be considered valid ignoring
-    // ordering val. But what happens ini hudi is, all records in log files are reconciled and then merged with base

Review comment:
       It's not about your fix -- the previous test was describing following issue: if in the _logs_ we have U1 (update), then D2 (deletion), then U3, currently U1 and D1 will cancel each other out and we will only carry U3. This is incorrect, instead we should be carrying both D1 and U3, b/c D1 will have to be applied against base as well, before applying U3

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
##########
@@ -1091,6 +1092,123 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete");
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType diskMapType,
+                                                        boolean isCompressionEnabled,
+                                                        boolean readBlocksLazily)
+      throws IOException, URISyntaxException, InterruptedException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+    // Set a small threshold so that every block is a new version
+    Writer writer =
+        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+
+    // Write 1
+    List<IndexedRecord> records1 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> copyOfRecords1 = records1.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records1, header);
+    writer.appendBlock(dataBlock);
+
+    // Write 2
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+    List<IndexedRecord> records2 = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    List<IndexedRecord> copyOfRecords2 = records2.stream()
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, schema)).collect(Collectors.toList());
+    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
+    writer.appendBlock(dataBlock);
+
+    copyOfRecords1.addAll(copyOfRecords2);
+    List<String> originalKeys =
+        copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+            .collect(Collectors.toList());
+
+    // Delete 10 keys
+    // Default orderingVal is 0, which means natural order, the DELETE records
+    // should overwrite the data records.
+    List<DeleteKey> deletedKeys1 = copyOfRecords1.subList(0, 10).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+    HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deletedKeys1.toArray(new DeleteKey[0]), header);
+    writer.appendBlock(deleteBlock1);
+
+    // Delete another 10 keys with -1 as orderingVal.
+    // The deletion should not work
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+    HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1))).toArray(DeleteKey[]::new), header);
+    writer.appendBlock(deleteBlock2);
+
+    // Delete another 10 keys with +1 as orderingVal.
+    // The deletion should work because the keys has greater ordering value.
+    List<DeleteKey> deletedKeys3 = copyOfRecords1.subList(20, 30).stream()
+        .map(s -> (DeleteKey.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), 1)))
+        .collect(Collectors.toList());
+
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
+    HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deletedKeys3.toArray(new DeleteKey[0]), header);
+    writer.appendBlock(deleteBlock3);
+
+    List<String> allLogFiles =
+        FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100")
+            .map(s -> s.getPath().toString()).collect(Collectors.toList());
+
+    FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "101", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "102", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "103", fs);
+    FileCreateUtils.createDeltaCommit(basePath, "104", fs);
+
+    HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("104")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .withDiskMapType(diskMapType)
+        .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+        .build();
+
+    assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
+    final List<String> readKeys = new ArrayList<>(200);
+    final List<String> emptyPayloadKeys = new ArrayList<>();
+    scanner.forEach(s -> readKeys.add(s.getRecordKey()));
+    scanner.forEach(s -> {

Review comment:
       Can you please elaborate how's it more clear? 
   
   > Combining is not the gold rule in all of the places.
   
   It's not about gold rules, it's about whether it makes sense or not. Iterating twice over the same collection doesn't really make sense when we can do it in one iteration, right? Why burn CI/workstation compute for no reason? It might be fine in the context of one test but keep in mind that we have thousands of tests that we need to run.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended after the data block,
+ * we need to keep the ordering val to combine with the data records when merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       > For BWC, i would expect for other contributors can address this in following PR.
   
   I don't think we can treat BWC as optional here. What do you think @vinothchandar? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/DeleteKey.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import java.util.Objects;
+
+/**
+ * Delete key is a combination of HoodieKey and ordering value.
+ * The key is used for {@link org.apache.hudi.common.table.log.block.HoodieDeleteBlock}
+ * to support per-record deletions. The deletion block is always appended after the data block,
+ * we need to keep the ordering val to combine with the data records when merging, or the data may
+ * be dropped if there are intermediate deletions for the inputs
+ * (a new INSERT comes after a DELETE in one input batch).
+ */
+public class DeleteKey extends HoodieKey {

Review comment:
       > The deletion payload need to execute combineAndGetUpdateValue with the records in base file so it should be the same payload class with the data record.
   
   This is an interesting point you're bringing up. First of all, i think we have to implement deletions in a uniform way across both COW/MOR, i see no reason for us to duplicate the logic since use-cases are comparable (we simply need to carry a tombstone w/in the payload either in the log or in the batch) 
   
   Second, we need to keep in mind that the `RecordPayload` is just the facade carrying the semantic of how we a) resolve "duplicates" w/in written batch (`preCombine`) or b) merge records (`combineAndGetUpdateValue`). Therefore i don't see an issue with 2 records having different `RecordPayload` classes if they require different merge semantics (tombstone have to clump preceding value, w/o any merging)
   
   > There is already an EmptyHoodieRecordPayload used for SQL deletions.
   
   SG, my proposal was more of a concept. We should def re-use existing facilities.
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -114,4 +114,16 @@ default T preCombine(T oldValue, Properties properties) {
   default Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+  /**
+   * This method can be used to extract the ordering value of the payload for combining/merging,
+   * or 0 if no value is specified which means natural order.

Review comment:
       Oh i see, it's referring to "event" time vs "arrival" time. Can we instead use event/arrival terminology to make things clear?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org