You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/05/14 20:38:09 UTC

[incubator-hudi] branch master updated: [HUDI-843] Add ability to specify time unit for TimestampBasedKeyGenerator (#1541)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f094f42  [HUDI-843] Add ability to specify time unit for  TimestampBasedKeyGenerator (#1541)
f094f42 is described below

commit f094f4285782e1253b52fef788fa7461c67d95a7
Author: Alexander Filipchik <af...@gmail.com>
AuthorDate: Thu May 14 13:37:59 2020 -0700

    [HUDI-843] Add ability to specify time unit for  TimestampBasedKeyGenerator (#1541)
    
    
    
    Co-authored-by: Alex Filipchik <al...@csscompany.com>
    Co-authored-by: Vinoth Chandar <vi...@apache.org>
---
 .../keygen/TimestampBasedKeyGenerator.java         | 47 ++++++++++++++++++----
 .../keygen/TestTimestampBasedKeyGenerator.java     | 26 +++++++++---
 2 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
index 919a2ef..e5bdc64 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/keygen/TimestampBasedKeyGenerator.java
@@ -35,6 +35,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
  * Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
@@ -42,9 +46,11 @@ import java.util.TimeZone;
 public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
 
   enum TimestampType implements Serializable {
-    UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS
+    UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
   }
 
+  private final TimeUnit timeUnit;
+
   private final TimestampType timestampType;
 
   private SimpleDateFormat inputDateFormat;
@@ -62,6 +68,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
 
     // One value from TimestampType above
     private static final String TIMESTAMP_TYPE_FIELD_PROP = "hoodie.deltastreamer.keygen.timebased.timestamp.type";
+    private static final String INPUT_TIME_UNIT =
+        "hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit";
     private static final String TIMESTAMP_INPUT_DATE_FORMAT_PROP =
         "hoodie.deltastreamer.keygen.timebased.input.dateformat";
     private static final String TIMESTAMP_OUTPUT_DATE_FORMAT_PROP =
@@ -84,6 +92,21 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
       this.inputDateFormat = new SimpleDateFormat(config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
       this.inputDateFormat.setTimeZone(timeZone);
     }
+
+    switch (this.timestampType) {
+      case EPOCHMILLISECONDS:
+        timeUnit = MILLISECONDS;
+        break;
+      case UNIX_TIMESTAMP:
+        timeUnit = SECONDS;
+        break;
+      case SCALAR:
+        String timeUnitStr = config.getString(Config.INPUT_TIME_UNIT, TimeUnit.SECONDS.toString());
+        timeUnit = TimeUnit.valueOf(timeUnitStr.toUpperCase());
+        break;
+      default:
+        timeUnit = null;
+    }
   }
 
   @Override
@@ -96,21 +119,20 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
     partitionPathFormat.setTimeZone(timeZone);
 
     try {
-      long unixTime;
+      long timeMs;
       if (partitionVal instanceof Double) {
-        unixTime = ((Double) partitionVal).longValue();
+        timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
       } else if (partitionVal instanceof Float) {
-        unixTime = ((Float) partitionVal).longValue();
+        timeMs = convertLongTimeToMillis(((Float) partitionVal).longValue());
       } else if (partitionVal instanceof Long) {
-        unixTime = (Long) partitionVal;
+        timeMs = convertLongTimeToMillis((Long) partitionVal);
       } else if (partitionVal instanceof CharSequence) {
-        unixTime = inputDateFormat.parse(partitionVal.toString()).getTime() / 1000;
+        timeMs = inputDateFormat.parse(partitionVal.toString()).getTime();
       } else {
         throw new HoodieNotSupportedException(
           "Unexpected type for partition field: " + partitionVal.getClass().getName());
       }
-      Date timestamp = this.timestampType == TimestampType.EPOCHMILLISECONDS ? new Date(unixTime) : new Date(unixTime * 1000);
-
+      Date timestamp = new Date(timeMs);
       String recordKey = DataSourceUtils.getNestedFieldValAsString(record, recordKeyField, true);
       if (recordKey == null || recordKey.isEmpty()) {
         throw new HoodieKeyException("recordKey value: \"" + recordKey + "\" for field: \"" + recordKeyField + "\" cannot be null or empty.");
@@ -123,4 +145,13 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
       throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, pe);
     }
   }
+
+  private long convertLongTimeToMillis(Long partitionVal) {
+    if (timeUnit == null) {
+      // should not be possible
+      throw new RuntimeException(Config.INPUT_TIME_UNIT + " is not specified but scalar it supplied as time value");
+    }
+
+    return MILLISECONDS.convert(partitionVal, timeUnit);
+  }
 }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
index ba090a8..05cb6f9 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/keygen/TestTimestampBasedKeyGenerator.java
@@ -47,10 +47,15 @@ public class TestTimestampBasedKeyGenerator {
     properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "false");
   }
   
-  private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone) {
+  private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) {
     properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", timestampType);
     properties.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", dateFormat);
     properties.setProperty("hoodie.deltastreamer.keygen.timebased.timezone", timezone);
+
+    if (scalarType != null) {
+      properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType);
+    }
+
     return properties;
   }
 
@@ -58,25 +63,36 @@ public class TestTimestampBasedKeyGenerator {
   public void testTimestampBasedKeyGenerator() {
     // timezone is GMT+8:00
     baseRecord.put("createTime", 1578283932000L);
-    properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00");
+    properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00", null);
     HoodieKey hk1 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
     assertEquals("2020-01-06 12", hk1.getPartitionPath());
 
     // timezone is GMT
-    properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT");
+    properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT", null);
     HoodieKey hk2 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
     assertEquals("2020-01-06 04", hk2.getPartitionPath());
 
     // timestamp is DATE_STRING, timezone is GMT+8:00
     baseRecord.put("createTime", "2020-01-06 12:12:12");
-    properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT+8:00");
+    properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT+8:00", null);
     properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss");
     HoodieKey hk3 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
     assertEquals("2020-01-06 12", hk3.getPartitionPath());
 
     // timezone is GMT
-    properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT");
+    properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh", "GMT", null);
     HoodieKey hk4 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
     assertEquals("2020-01-06 12", hk4.getPartitionPath());
   }
+
+  @Test
+  public void testScalar() {
+    // timezone is GMT+8:00
+    baseRecord.put("createTime", 20000L);
+
+    // timezone is GMT
+    properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
+    HoodieKey hk5 = new TimestampBasedKeyGenerator(properties).getKey(baseRecord);
+    assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
+  }
 }