You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/04 05:53:08 UTC
[hudi] 03/07: [HUDI-5551] support seconds unit on event_time metrics (#7664)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 46aa15bd564e823ca6984992a110534de9611ece
Author: 苏承祥 <11...@qq.com>
AuthorDate: Fri Feb 3 16:02:41 2023 +0800
[HUDI-5551] support seconds unit on event_time metrics (#7664)
* event_time support seconds
Co-authored-by: 苏承祥 <su...@tuya.com>
---
.../java/org/apache/hudi/client/WriteStatus.java | 13 ++-
.../org/apache/hudi/client/TestWriteStatus.java | 92 ++++++++++++++++++++++
2 files changed, 104 insertions(+), 1 deletion(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index b306d6c5400..54e88fcca22 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -101,7 +101,18 @@ public class WriteStatus implements Serializable {
String eventTimeVal = optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
try {
if (!StringUtils.isNullOrEmpty(eventTimeVal)) {
- long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
+ int length = eventTimeVal.length();
+ long millisEventTime;
+ // eventTimeVal in seconds unit
+ if (length == 10) {
+ millisEventTime = Long.parseLong(eventTimeVal) * 1000;
+ } else if (length == 13) {
+ // eventTimeVal in millis unit
+ millisEventTime = Long.parseLong(eventTimeVal);
+ } else {
+ throw new IllegalArgumentException("not support event_time format:" + eventTimeVal);
+ }
+ long eventTime = DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
stat.setMinEventTime(eventTime);
stat.setMaxEventTime(eventTime);
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 78e711ed701..99fb76650f9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -18,12 +18,19 @@
package org.apache.hudi.client;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -53,4 +60,89 @@ public class TestWriteStatus {
assertTrue(status.getWrittenRecords().isEmpty());
assertEquals(2000, status.getTotalRecords());
}
+
+ @Test
+ public void testSuccessWithEventTime() {
+ // test with empty eventTime
+ WriteStatus status = new WriteStatus(false, 1.0);
+ status.setStat(new HoodieWriteStat());
+ for (int i = 0; i < 1000; i++) {
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, "");
+ status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+ }
+ assertEquals(1000, status.getTotalRecords());
+ assertFalse(status.hasErrors());
+ assertNull(status.getStat().getMaxEventTime());
+ assertNull(status.getStat().getMinEventTime());
+
+ // test with null eventTime
+ status = new WriteStatus(false, 1.0);
+ status.setStat(new HoodieWriteStat());
+ for (int i = 0; i < 1000; i++) {
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, null);
+ status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+ }
+ assertEquals(1000, status.getTotalRecords());
+ assertFalse(status.hasErrors());
+ assertNull(status.getStat().getMaxEventTime());
+ assertNull(status.getStat().getMinEventTime());
+
+ // test with seconds eventTime
+ status = new WriteStatus(false, 1.0);
+ status.setStat(new HoodieWriteStat());
+ long minSeconds = 0L;
+ long maxSeconds = 0L;
+ for (int i = 0; i < 1000; i++) {
+ Map<String, String> metadata = new HashMap<>();
+ long eventTime = System.currentTimeMillis() / 1000;
+ if (i == 0) {
+ minSeconds = eventTime;
+ } else if (i == 999) {
+ maxSeconds = eventTime;
+ }
+ metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+ status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+ }
+ assertEquals(1000, status.getTotalRecords());
+ assertFalse(status.hasErrors());
+ assertEquals(maxSeconds * 1000L, status.getStat().getMaxEventTime());
+ assertEquals(minSeconds * 1000L, status.getStat().getMinEventTime());
+
+ // test with millis eventTime
+ status = new WriteStatus(false, 1.0);
+ status.setStat(new HoodieWriteStat());
+ minSeconds = 0L;
+ maxSeconds = 0L;
+ for (int i = 0; i < 1000; i++) {
+ Map<String, String> metadata = new HashMap<>();
+ long eventTime = System.currentTimeMillis();
+ if (i == 0) {
+ minSeconds = eventTime;
+ } else if (i == 999) {
+ maxSeconds = eventTime;
+ }
+ metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+ status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+ }
+ assertEquals(1000, status.getTotalRecords());
+ assertFalse(status.hasErrors());
+ assertEquals(maxSeconds, status.getStat().getMaxEventTime());
+ assertEquals(minSeconds, status.getStat().getMinEventTime());
+
+ // test with error format eventTime
+ status = new WriteStatus(false, 1.0);
+ status.setStat(new HoodieWriteStat());
+ for (int i = 0; i < 1000; i++) {
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(i));
+ status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+ }
+ assertEquals(1000, status.getTotalRecords());
+ assertFalse(status.hasErrors());
+ assertNull(status.getStat().getMaxEventTime());
+ assertNull(status.getStat().getMinEventTime());
+
+ }
}