You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2021/09/12 03:27:50 UTC

[hudi] branch master updated: [HUDI-2398] Collect event time for inserts in DefaultHoodieRecordPayload (#3602)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f991ee  [HUDI-2398] Collect event time for inserts in DefaultHoodieRecordPayload (#3602)
4f991ee is described below

commit 4f991ee3525c6225c7bf3b46e272f7d5b919196e
Author: Ankush Kanungo <40...@users.noreply.github.com>
AuthorDate: Sat Sep 11 20:27:40 2021 -0700

    [HUDI-2398] Collect event time for inserts in DefaultHoodieRecordPayload (#3602)
---
 .../apache/hudi/io/HoodieSortedMergeHandle.java    |  8 ++++----
 .../common/model/DefaultHoodieRecordPayload.java   | 23 ++++++++++++++-------
 .../model/TestDefaultHoodieRecordPayload.java      | 24 ++++++++++++++++++----
 3 files changed, 40 insertions(+), 15 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 763178d..606e63a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -90,9 +90,9 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
       }
       try {
         if (useWriterSchema) {
-          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
+          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
         } else {
-          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
+          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
         }
         insertRecordsWritten++;
         writtenRecordKeys.add(keyToPreWrite);
@@ -112,9 +112,9 @@ public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> ext
         HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
         if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
           if (useWriterSchema) {
-            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields));
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
           } else {
-            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema));
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
           }
           insertRecordsWritten++;
         }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 86ccf67..76474fd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.Schema;
@@ -56,7 +55,7 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
     if (recordBytes.length == 0) {
       return Option.empty();
     }
-    HoodieConfig hoodieConfig = new HoodieConfig(properties);
+
     GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
 
     // Null check is needed here to support schema evolution. The record in storage may be from old schema where
@@ -68,17 +67,27 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
     /*
      * We reached a point where the value is disk is older than the incoming record.
      */
-    eventTime = Option.ofNullable(getNestedFieldVal(incomingRecord, hoodieConfig
-        .getString(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true));
+    eventTime = updateEventTime(incomingRecord, properties);
 
     /*
      * Now check if the incoming record is a delete record.
      */
-    if (isDeleteRecord(incomingRecord)) {
+    return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
+  }
+
+  @Override
+  public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException {
+    if (recordBytes.length == 0) {
       return Option.empty();
-    } else {
-      return Option.of(incomingRecord);
     }
+    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+    eventTime = updateEventTime(incomingRecord, properties);
+
+    return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
+  }
+
+  private static Option<Object> updateEventTime(GenericRecord record, Properties properties) {
+    return Option.ofNullable(getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true));
   }
 
   @Override
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
index 5be0961..87d4e74 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -77,8 +77,8 @@ public class TestDefaultHoodieRecordPayload {
     assertEquals(payload1.preCombine(payload2, props), payload2);
     assertEquals(payload2.preCombine(payload1, props), payload2);
 
-    assertEquals(record1, payload1.getInsertValue(schema).get());
-    assertEquals(record2, payload2.getInsertValue(schema).get());
+    assertEquals(record1, payload1.getInsertValue(schema, props).get());
+    assertEquals(record2, payload2.getInsertValue(schema, props).get());
 
     assertEquals(payload1.combineAndGetUpdateValue(record2, schema, props).get(), record2);
     assertEquals(payload2.combineAndGetUpdateValue(record1, schema, props).get(), record2);
@@ -103,8 +103,8 @@ public class TestDefaultHoodieRecordPayload {
     assertEquals(payload1.preCombine(payload2, props), payload2);
     assertEquals(payload2.preCombine(payload1, props), payload2);
 
-    assertEquals(record1, payload1.getInsertValue(schema).get());
-    assertFalse(payload2.getInsertValue(schema).isPresent());
+    assertEquals(record1, payload1.getInsertValue(schema, props).get());
+    assertFalse(payload2.getInsertValue(schema, props).isPresent());
 
     assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, props).get(), delRecord1);
     assertFalse(payload2.combineAndGetUpdateValue(record1, schema, props).isPresent());
@@ -142,4 +142,20 @@ public class TestDefaultHoodieRecordPayload {
     assertEquals(eventTime,
         Long.parseLong(payload2.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
   }
+
+  @ParameterizedTest
+  @ValueSource(longs = {1L, 1612542030000L})
+  public void testGetEventTimeInMetadataForInserts(long eventTime) throws IOException {
+    GenericRecord record = new GenericData.Record(schema);
+
+    record.put("id", "1");
+    record.put("partition", "partition0");
+    record.put("ts", eventTime);
+    record.put("_hoodie_is_deleted", false);
+    DefaultHoodieRecordPayload payload = new DefaultHoodieRecordPayload(record, eventTime);
+    payload.getInsertValue(schema, props);
+    assertTrue(payload.getMetadata().isPresent());
+    assertEquals(eventTime,
+        Long.parseLong(payload.getMetadata().get().get(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY)));
+  }
 }