You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/16 03:02:22 UTC

[hudi] branch master updated: [HUDI-6050] Fix write helper deduplicate records lost origin data operation (#8410)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 94d999e90b1 [HUDI-6050] Fix write helper deduplicate records lost origin data operation (#8410)
94d999e90b1 is described below

commit 94d999e90b1d750f82a34c3e13d7fee5df8a0e0f
Author: Wangyh <76...@qq.com>
AuthorDate: Sun Apr 16 11:02:16 2023 +0800

    [HUDI-6050] Fix write helper deduplicate records lost origin data operation (#8410)
    
    Always use the operation of the chosen payload.
---
 .../table/action/commit/HoodieWriteHelper.java     |  7 ++-
 .../hudi/table/action/commit/JavaWriteHelper.java  |  2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 64 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 3 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index b8e761acb02..47793d7155e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.util.collection.Pair;
@@ -77,8 +78,10 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
       } catch (IOException e) {
         throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
       }
-      HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
-      return reducedRecord.newInstance(reducedKey);
+      boolean choosePrev = rec1.getData().equals(reducedRecord.getData());
+      HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
+      HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
+      return reducedRecord.newInstance(reducedKey, operation);
     }, reduceParallelism).map(Pair::getRight);
   }
 }
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index f3046bf22e2..04befe4ea16 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -79,7 +79,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
       // we cannot allow the user to change the key or partitionPath, since that will affect
       // everything
       // so pick it from one of the records.
-      return reducedRecord.newInstance(rec1.getKey());
+      return reducedRecord.newInstance(rec1.getKey(), rec1.getOperation());
     }).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 222816ff772..78b7279aa56 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -48,6 +48,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -417,6 +418,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     testDeduplication(SparkRDDWriteClient::insert, populateMetaFields);
   }
 
+  /**
+   * Test De-duplication behavior for HoodieWriteClient insert API.
+   */
+  @ParameterizedTest
+  @MethodSource("populateMetaFieldsParams")
+  public void testDeduplicationKeepOperationFieldOnInsert(boolean populateMetaFields) throws Exception {
+    testDeduplicationKeepOperation(SparkRDDWriteClient::insert, populateMetaFields);
+  }
+
   /**
    * Test De-duplication behavior for HoodieWriteClient bulk-insert API.
    */
@@ -503,6 +513,60 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     }
   }
 
+  /**
+   * Test Deduplication Logic for write function.
+   *
+   * @param writeFn One of HoodieWriteClient non-prepped write APIs
+   * @throws Exception in case of failure
+   */
+  private void testDeduplicationKeepOperation(
+      Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean populateMetaFields) throws Exception {
+    String newCommitTime = "001";
+
+    String recordKey = UUID.randomUUID().toString();
+    HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
+    HoodieRecord<RawTripTestPayload> recordOne =
+        new HoodieAvroRecord(keyOne, dataGen.generateRandomValue(keyOne, newCommitTime), HoodieOperation.INSERT);
+
+    HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
+    HoodieRecord recordTwo =
+        new HoodieAvroRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime), HoodieOperation.INSERT);
+
+    // Same key and partition as keyTwo
+    HoodieRecord recordThree =
+        new HoodieAvroRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime), HoodieOperation.UPDATE_AFTER);
+
+    HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
+        jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
+    HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAllowOperationMetadataField(true)
+        .combineInput(true, true);
+    addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
+    HoodieWriteConfig writeConfig = configBuilder.build();
+
+    // Global dedup should be done based on recordKey only
+    HoodieIndex index = mock(HoodieIndex.class);
+    when(index.isGlobal()).thenReturn(true);
+    HoodieRecordMerger recordMerger = HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
+    int dedupParallelism = records.getNumPartitions() + 100;
+    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
+        (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
+            .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
+    assertEquals(dedupedRecs.get(0).getOperation(), recordThree.getOperation());
+
+    // Perform write-action and check
+    JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
+
+    try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+      assertEquals(2, statuses.size());
+      assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+          .collect(Collectors.toList()));
+    }
+  }
+
   /**
    * Assert that there is no duplicate key at the partition level.
    *