You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/04 05:53:08 UTC

[hudi] 03/07: [HUDI-5551] support seconds unit on event_time metrics (#7664)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 46aa15bd564e823ca6984992a110534de9611ece
Author: 苏承祥 <11...@qq.com>
AuthorDate: Fri Feb 3 16:02:41 2023 +0800

    [HUDI-5551] support seconds unit on event_time metrics (#7664)
    
    * event_time support seconds
    
    Co-authored-by: 苏承祥 <su...@tuya.com>
---
 .../java/org/apache/hudi/client/WriteStatus.java   | 13 ++-
 .../org/apache/hudi/client/TestWriteStatus.java    | 92 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index b306d6c5400..54e88fcca22 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -101,7 +101,18 @@ public class WriteStatus implements Serializable {
       String eventTimeVal = optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
       try {
         if (!StringUtils.isNullOrEmpty(eventTimeVal)) {
-          long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
+          int length = eventTimeVal.length();
+          long millisEventTime;
+          // eventTimeVal in seconds unit
+          if (length == 10) {
+            millisEventTime = Long.parseLong(eventTimeVal) * 1000;
+          } else if (length == 13) {
+            // eventTimeVal in millis unit
+            millisEventTime = Long.parseLong(eventTimeVal);
+          } else {
+            throw new IllegalArgumentException("not support event_time format:" + eventTimeVal);
+          }
+          long eventTime = DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
           stat.setMinEventTime(eventTime);
           stat.setMaxEventTime(eventTime);
         }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 78e711ed701..99fb76650f9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -18,12 +18,19 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.util.Option;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -53,4 +60,89 @@ public class TestWriteStatus {
     assertTrue(status.getWrittenRecords().isEmpty());
     assertEquals(2000, status.getTotalRecords());
   }
+
+  @Test
+  public void testSuccessWithEventTime() {
+    // test with empty eventTime
+    WriteStatus status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, "");
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertNull(status.getStat().getMaxEventTime());
+    assertNull(status.getStat().getMinEventTime());
+
+    // test with null eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, null);
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertNull(status.getStat().getMaxEventTime());
+    assertNull(status.getStat().getMinEventTime());
+
+    // test with seconds eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    long minSeconds = 0L;
+    long maxSeconds = 0L;
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      long eventTime = System.currentTimeMillis() / 1000;
+      if (i == 0) {
+        minSeconds = eventTime;
+      } else if (i == 999) {
+        maxSeconds = eventTime;
+      }
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertEquals(maxSeconds * 1000L, status.getStat().getMaxEventTime());
+    assertEquals(minSeconds * 1000L, status.getStat().getMinEventTime());
+
+    // test with millis eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    minSeconds = 0L;
+    maxSeconds = 0L;
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      long eventTime = System.currentTimeMillis();
+      if (i == 0) {
+        minSeconds = eventTime;
+      } else if (i == 999) {
+        maxSeconds = eventTime;
+      }
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertEquals(maxSeconds, status.getStat().getMaxEventTime());
+    assertEquals(minSeconds, status.getStat().getMinEventTime());
+
+    // test with error format eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(i));
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertNull(status.getStat().getMaxEventTime());
+    assertNull(status.getStat().getMinEventTime());
+
+  }
 }