You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2023/04/23 02:21:59 UTC
[hudi] branch master updated: [HUDI-4920] fix PartialUpdatePayload cannot return deleted record in … (#6799)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 71752e285dd [HUDI-4920] fix PartialUpdatePayload cannot return deleted record in … (#6799)
71752e285dd is described below
commit 71752e285ddfb3759a29aa6666ebc5d588930e2d
Author: 冯健 <fe...@gmail.com>
AuthorDate: Sun Apr 23 10:21:52 2023 +0800
[HUDI-4920] fix PartialUpdatePayload cannot return deleted record in … (#6799)
Co-authored-by: fengjian <fe...@dipeak.com>
---
.../common/model/PartialUpdateAvroPayload.java | 68 +++++++++++++++-------
.../common/model/TestPartialUpdateAvroPayload.java | 36 +++++++++---
2 files changed, 74 insertions(+), 30 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index 08c020f63ca..7c09a38d56f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -18,17 +18,17 @@
package org.apache.hudi.common.model;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
+
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.avro.generic.IndexedRecord;
-
import java.io.IOException;
import java.util.List;
import java.util.Properties;
@@ -85,7 +85,7 @@ import java.util.Properties;
* Result data after preCombine or combineAndGetUpdateValue:
* id ts name price
* 1 2 name_1 price_1
- *</pre>
+ * </pre>
*/
public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvroPayload {
@@ -100,14 +100,14 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
@Override
public PartialUpdateAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) {
if (oldValue.recordBytes.length == 0) {
- // use natural order for delete record
+ // use natural order for deleted record
return this;
}
// pick the payload with greater ordering value as insert record
final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false;
try {
GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema);
- Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord);
+ Option<IndexedRecord> mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord, true);
if (mergedRecord.isPresent()) {
return new PartialUpdateAvroPayload((GenericRecord) mergedRecord.get(),
shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal);
@@ -120,12 +120,12 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
- return this.mergeOldRecord(currentValue, schema, false);
+ return this.mergeOldRecord(currentValue, schema, false, false);
}
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) throws IOException {
- return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop));
+ return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop), false);
}
/**
@@ -139,18 +139,31 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
// Utilities
// -------------------------------------------------------------------------
+ /**
+ * Merge old record with new record.
+ *
+ * @param oldRecord
+ * @param schema
+ * @param isOldRecordNewer
+ * @param isPreCombining flag for deleted record combine logic
+ * 1 preCombine: if delete record is newer, return merged record with _hoodie_is_deleted = true
+ * 2 combineAndGetUpdateValue: if delete record is newer, return empty since we don't need to store deleted data to storage
+ * @return
+ * @throws IOException
+ */
private Option<IndexedRecord> mergeOldRecord(IndexedRecord oldRecord,
- Schema schema,
- boolean isOldRecordNewer) throws IOException {
- Option<IndexedRecord> recordOption = getInsertValue(schema);
- if (!recordOption.isPresent()) {
+ Schema schema,
+ boolean isOldRecordNewer, boolean isPreCombining) throws IOException {
+ Option<IndexedRecord> recordOption = getInsertValue(schema, isPreCombining);
+
+ if (!recordOption.isPresent() && !isPreCombining) {
// use natural order for delete record
return Option.empty();
}
if (isOldRecordNewer && schema.getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD) != null) {
// handling disorder, should use the metadata fields of the updating record
- return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
+ return mergeDisorderRecordsWithMetadata(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get(), isPreCombining);
} else if (isOldRecordNewer) {
return mergeRecords(schema, (GenericRecord) oldRecord, (GenericRecord) recordOption.get());
} else {
@@ -158,20 +171,34 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
}
}
+ /**
+ * return itself as long as it called by preCombine
+ * @param schema
+ * @param isPreCombining
+ * @return
+ * @throws IOException
+ */
+ public Option<IndexedRecord> getInsertValue(Schema schema, boolean isPreCombining) throws IOException {
+ if (recordBytes.length == 0 || (!isPreCombining && isDeletedRecord)) {
+ return Option.empty();
+ }
+
+ return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes, schema));
+ }
+
/**
* Merges the given disorder records with metadata.
*
* @param schema The record schema
* @param oldRecord The current record from file
* @param updatingRecord The incoming record
- *
* @return the merged record option
*/
protected Option<IndexedRecord> mergeDisorderRecordsWithMetadata(
Schema schema,
GenericRecord oldRecord,
- GenericRecord updatingRecord) {
- if (isDeleteRecord(oldRecord)) {
+ GenericRecord updatingRecord, boolean isPreCombining) {
+ if (isDeleteRecord(oldRecord) && !isPreCombining) {
return Option.empty();
} else {
final GenericRecordBuilder builder = new GenericRecordBuilder(schema);
@@ -197,9 +224,8 @@ public class PartialUpdateAvroPayload extends OverwriteNonDefaultsWithLatestAvro
* Returns whether the given record is newer than the record of this payload.
*
* @param orderingVal
- * @param record The record
- * @param prop The payload properties
- *
+ * @param record The record
+ * @param prop The payload properties
* @return true if the given record is newer
*/
private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
index 2d5bb511981..6431b63899f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestPartialUpdateAvroPayload.java
@@ -18,11 +18,13 @@
package org.apache.hudi.common.model;
-import org.apache.hudi.common.util.Option;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.Option;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -34,6 +36,8 @@ import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
/**
* Unit tests {@link TestPartialUpdateAvroPayload}.
@@ -147,24 +151,38 @@ public class TestPartialUpdateAvroPayload {
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "1");
record2.put("partition", "partition0");
- record2.put("ts", 0L);
- record2.put("_hoodie_is_deleted", true);
+ record2.put("ts", 2L);
+ record2.put("_hoodie_is_deleted", false);
record2.put("city", "NY0");
record2.put("child", Collections.emptyList());
PartialUpdateAvroPayload payload1 = new PartialUpdateAvroPayload(record1, 0L);
- PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(delRecord1, 1L);
+ PartialUpdateAvroPayload delPayload = new PartialUpdateAvroPayload(delRecord1, 1L);
+ PartialUpdateAvroPayload payload2 = new PartialUpdateAvroPayload(record2, 2L);
+
+ PartialUpdateAvroPayload mergedPayload = payload1.preCombine(delPayload, schema, new Properties());
+ assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(true));
+ assertArrayEquals(mergedPayload.recordBytes, delPayload.recordBytes);
+
+ mergedPayload = delPayload.preCombine(payload1, schema, new Properties());
+ assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(true));
+ assertArrayEquals(mergedPayload.recordBytes, delPayload.recordBytes);
+
+ mergedPayload = payload2.preCombine(delPayload, schema, new Properties());
+ assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(false));
+ assertArrayEquals(mergedPayload.recordBytes, payload2.recordBytes);
- assertArrayEquals(payload1.preCombine(payload2).recordBytes, payload2.recordBytes);
- assertArrayEquals(payload2.preCombine(payload1).recordBytes, payload2.recordBytes);
+ mergedPayload = delPayload.preCombine(payload2, schema, new Properties());
+ assertTrue(HoodieAvroUtils.bytesToAvro(mergedPayload.recordBytes, schema).get("_hoodie_is_deleted").equals(false));
+ assertArrayEquals(mergedPayload.recordBytes, payload2.recordBytes);
assertEquals(record1, payload1.getInsertValue(schema).get());
- assertFalse(payload2.getInsertValue(schema).isPresent());
+ assertFalse(delPayload.getInsertValue(schema).isPresent());
Properties properties = new Properties();
properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts");
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty());
- assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent());
+ assertFalse(delPayload.combineAndGetUpdateValue(record1, schema, properties).isPresent());
}
@Test