You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/16 03:02:22 UTC
[hudi] branch master updated: [HUDI-6050] Fix write helper deduplicate records lost origin data operation (#8410)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 94d999e90b1 [HUDI-6050] Fix write helper deduplicate records lost origin data operation (#8410)
94d999e90b1 is described below
commit 94d999e90b1d750f82a34c3e13d7fee5df8a0e0f
Author: Wangyh <76...@qq.com>
AuthorDate: Sun Apr 16 11:02:16 2023 +0800
[HUDI-6050] Fix write helper deduplicate records lost origin data operation (#8410)
Always use the operation of the chosen payload.
---
.../table/action/commit/HoodieWriteHelper.java | 7 ++-
.../hudi/table/action/commit/JavaWriteHelper.java | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 64 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 3 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index b8e761acb02..47793d7155e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.util.collection.Pair;
@@ -77,8 +78,10 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi
} catch (IOException e) {
throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e);
}
- HoodieKey reducedKey = rec1.getData().equals(reducedRecord.getData()) ? rec1.getKey() : rec2.getKey();
- return reducedRecord.newInstance(reducedKey);
+ boolean choosePrev = rec1.getData().equals(reducedRecord.getData());
+ HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
+ HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation();
+ return reducedRecord.newInstance(reducedKey, operation);
}, reduceParallelism).map(Pair::getRight);
}
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
index f3046bf22e2..04befe4ea16 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaWriteHelper.java
@@ -79,7 +79,7 @@ public class JavaWriteHelper<T,R> extends BaseWriteHelper<T, List<HoodieRecord<T
// we cannot allow the user to change the key or partitionPath, since that will affect
// everything
// so pick it from one of the records.
- return reducedRecord.newInstance(rec1.getKey());
+ return reducedRecord.newInstance(rec1.getKey(), rec1.getOperation());
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 222816ff772..78b7279aa56 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -48,6 +48,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -417,6 +418,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
testDeduplication(SparkRDDWriteClient::insert, populateMetaFields);
}
+ /**
+ * Test De-duplication behavior for HoodieWriteClient insert API.
+ */
+ @ParameterizedTest
+ @MethodSource("populateMetaFieldsParams")
+ public void testDeduplicationKeepOperationFieldOnInsert(boolean populateMetaFields) throws Exception {
+ testDeduplicationKeepOperation(SparkRDDWriteClient::insert, populateMetaFields);
+ }
+
/**
* Test De-duplication behavior for HoodieWriteClient bulk-insert API.
*/
@@ -503,6 +513,60 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
}
+ /**
+ * Test Deduplication Logic for write function.
+ *
+ * @param writeFn One of HoodieWriteClient non-prepped write APIs
+ * @throws Exception in case of failure
+ */
+ private void testDeduplicationKeepOperation(
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean populateMetaFields) throws Exception {
+ String newCommitTime = "001";
+
+ String recordKey = UUID.randomUUID().toString();
+ HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01");
+ HoodieRecord<RawTripTestPayload> recordOne =
+ new HoodieAvroRecord(keyOne, dataGen.generateRandomValue(keyOne, newCommitTime), HoodieOperation.INSERT);
+
+ HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01");
+ HoodieRecord recordTwo =
+ new HoodieAvroRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime), HoodieOperation.INSERT);
+
+ // Same key and partition as keyTwo
+ HoodieRecord recordThree =
+ new HoodieAvroRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime), HoodieOperation.UPDATE_AFTER);
+
+ HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
+ jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
+ HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAllowOperationMetadataField(true)
+ .combineInput(true, true);
+ addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
+ HoodieWriteConfig writeConfig = configBuilder.build();
+
+ // Global dedup should be done based on recordKey only
+ HoodieIndex index = mock(HoodieIndex.class);
+ when(index.isGlobal()).thenReturn(true);
+ HoodieRecordMerger recordMerger = HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName());
+ int dedupParallelism = records.getNumPartitions() + 100;
+ HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
+ (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance()
+ .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), recordMerger);
+ List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
+ assertEquals(dedupedRecs.get(0).getOperation(), recordThree.getOperation());
+
+ // Perform write-action and check
+ JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig)) {
+ client.startCommitWithTime(newCommitTime);
+ List<WriteStatus> statuses = writeFn.apply(client, recordList, newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+ assertEquals(2, statuses.size());
+ assertNodupesInPartition(statuses.stream().map(WriteStatus::getWrittenRecords).flatMap(Collection::stream)
+ .collect(Collectors.toList()));
+ }
+ }
+
/**
* Assert that there is no duplicate key at the partition level.
*