You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/23 11:57:03 UTC

[hudi] branch master updated: [HUDI-1150] Fix unable to parse input partition field :1 exception when using TimestampBasedKeyGenerator(#1920)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 35b2185  [HUDI-1150] Fix unable to parse input partition field :1 exception when using TimestampBasedKeyGenerator(#1920)
35b2185 is described below

commit 35b21855da209c812e006c1afff3d940d5ac2a18
Author: Mathieu <wx...@126.com>
AuthorDate: Sun Aug 23 19:56:50 2020 +0800

    [HUDI-1150] Fix unable to parse input partition field :1 exception when using TimestampBasedKeyGenerator(#1920)
---
 .../main/java/org/apache/hudi/DataSourceUtils.java |  6 ++--
 .../apache/hudi/keygen/RowKeyGeneratorHelper.java  |  2 +-
 .../hudi/keygen/TimestampBasedKeyGenerator.java    | 38 +++++++++++++++++---
 ...rser.java => AbstractHoodieDateTimeParser.java} | 40 +++++++++++++++++-----
 .../keygen/parser/HoodieDateTimeParserImpl.java    | 17 +++------
 .../keygen/TestTimestampBasedKeyGenerator.java     | 39 +++++++++++++++++++--
 6 files changed, 109 insertions(+), 33 deletions(-)

diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index ea2cc5c..19316d5 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -39,7 +39,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.KeyGenerator;
-import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
+import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.avro.LogicalTypes;
@@ -172,9 +172,9 @@ public class DataSourceUtils {
   /**
    * Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
    */
-  public static HoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
+  public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
     try {
-      return (HoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
+      return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
     } catch (Throwable e) {
       throw new IOException("Could not load date time parser class " + parserClass, e);
     }
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
index 02b8492..4c05489 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
@@ -146,7 +146,7 @@ public class RowKeyGeneratorHelper {
         }
         valueToProcess = (Row) valueToProcess.get(positions.get(index));
       } else { // last index
-        if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
+        if (null != valueToProcess.getAs(positions.get(index)) && valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
           toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
           break;
         }
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index 25a52fe..97a7d2e 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
+import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
 import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl;
 
 import org.apache.avro.generic.GenericRecord;
@@ -41,6 +41,7 @@ import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -63,10 +64,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
   private final String outputDateFormat;
   private transient Option<DateTimeFormatter> inputFormatter;
   private transient DateTimeFormatter partitionFormatter;
-  private final HoodieDateTimeParser parser;
+  private final AbstractHoodieDateTimeParser parser;
 
   // TimeZone detailed settings reference
   // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
+  private final DateTimeZone inputDateTimeZone;
   private final DateTimeZone outputDateTimeZone;
 
   protected final boolean encodePartitionPath;
@@ -107,6 +109,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
     super(config, recordKeyField, partitionPathField);
     String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName());
     this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
+    this.inputDateTimeZone = parser.getInputDateTimeZone();
     this.outputDateTimeZone = parser.getOutputDateTimeZone();
     this.outputDateFormat = parser.getOutputDateFormat();
     this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
@@ -133,7 +136,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
   public String getPartitionPath(GenericRecord record) {
     Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true);
     if (partitionVal == null) {
-      partitionVal = 1L;
+      partitionVal = getDefaultPartitionVal();
     }
     try {
       return getPartitionPath(partitionVal);
@@ -143,6 +146,31 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
   }
 
   /**
+   * Set default value to partitionVal if the input value of partitionPathField is null.
+   */
+  private Object getDefaultPartitionVal() {
+    Object result = 1L;
+    if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
+      // since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP
+      // configured, here we take the first.
+      // {Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP} won't be null, it has been checked in the initialization process of
+      // inputFormatter
+      String delimiter = parser.getConfigInputDateFormatDelimiter();
+      String format = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "").split(delimiter)[0];
+
+      // if both input and output timeZone are not configured, use GMT.
+      if (null != inputDateTimeZone) {
+        return new DateTime(result, inputDateTimeZone).toString(format);
+      } else if (null != outputDateTimeZone) {
+        return new DateTime(result, outputDateTimeZone).toString(format);
+      } else {
+        return new DateTime(result, DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT"))).toString(format);
+      }
+    }
+    return result;
+  }
+
+  /**
    * The function takes care of lazily initialising dateTimeFormatter variables only once.
    */
   private void initIfNeeded() {
@@ -219,9 +247,9 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
     buildFieldPositionMapIfNeeded(row.schema());
     Object partitionPathFieldVal =  RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
     try {
-      if (partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
+      if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
           || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
-        fieldVal = 1L;
+        fieldVal = getDefaultPartitionVal();
       } else {
         fieldVal = partitionPathFieldVal;
       }
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
similarity index 52%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
rename to hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
index 6612f4c..80e26cc 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
@@ -17,35 +17,57 @@
 
 package org.apache.hudi.keygen.parser;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Serializable;
 
-public interface HoodieDateTimeParser extends Serializable {
+public abstract class AbstractHoodieDateTimeParser implements Serializable {
+
+  protected final TypedProperties config;
+  protected final String configInputDateFormatDelimiter;
+
+  public AbstractHoodieDateTimeParser(TypedProperties config) {
+    this.config = config;
+    this.configInputDateFormatDelimiter = initInputDateFormatDelimiter();
+  }
+
+  private String initInputDateFormatDelimiter() {
+    String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
+    inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
+    return inputDateFormatDelimiter;
+  }
 
   /**
    * Returns the output date format in which the partition paths will be created for the hudi dataset.
-   * @return
    */
-  String getOutputDateFormat();
+  public String getOutputDateFormat() {
+    return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
+  }
 
   /**
    * Returns input formats in which datetime based values might be coming in incoming records.
-   * @return
    */
-  Option<DateTimeFormatter> getInputFormatter();
+  public abstract Option<DateTimeFormatter> getInputFormatter();
 
   /**
    * Returns the datetime zone one should expect the incoming values into.
-   * @return
    */
-  DateTimeZone getInputDateTimeZone();
+  public abstract DateTimeZone getInputDateTimeZone();
 
   /**
    * Returns the datetime zone using which the final partition paths for hudi dataset are created.
-   * @return
    */
-  DateTimeZone getOutputDateTimeZone();
+  public abstract DateTimeZone getOutputDateTimeZone();
+
+  /**
+   * Returns the input date format delimiter, comma by default.
+   */
+  public String getConfigInputDateFormatDelimiter() {
+    return this.configInputDateFormatDelimiter;
+  }
+
 }
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
index 11790cb..41452d0 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
@@ -28,32 +28,22 @@ import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.DateTimeFormatterBuilder;
 import org.joda.time.format.DateTimeParser;
 
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.TimeZone;
 
-public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializable {
+public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
 
   private String configInputDateFormatList;
-  private final String configInputDateFormatDelimiter;
-  private final TypedProperties config;
 
   // TimeZone detailed settings reference
   // https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
   private final DateTimeZone inputDateTimeZone;
 
   public HoodieDateTimeParserImpl(TypedProperties config) {
-    this.config = config;
+    super(config);
     DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
     this.inputDateTimeZone = getInputDateTimeZone();
-    this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
-  }
-
-  private String getConfigInputDateFormatDelimiter() {
-    String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
-    inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
-    return inputDateFormatDelimiter;
   }
 
   private DateTimeFormatter getInputDateFormatter() {
@@ -65,7 +55,7 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
         .append(
         null,
         Arrays.stream(
-          this.configInputDateFormatList.split(this.configInputDateFormatDelimiter))
+          this.configInputDateFormatList.split(super.configInputDateFormatDelimiter))
           .map(String::trim)
           .map(DateTimeFormat::forPattern)
           .map(DateTimeFormatter::getParser)
@@ -119,4 +109,5 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
     }
     return !outputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(outputTimeZone)) : null;
   }
+
 }
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index 6afc6eb..7867415 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -150,6 +150,29 @@ public class TestTimestampBasedKeyGenerator {
 
     // test w/ Row
     assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
+
+    // timezone is GMT+8:00, createTime is null
+    baseRecord.put("createTime", null);
+    properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00", null);
+    keyGen = new TimestampBasedKeyGenerator(properties);
+    HoodieKey hk5 = keyGen.getKey(baseRecord);
+    assertEquals("1970-01-01 08", hk5.getPartitionPath());
+
+    // test w/ Row
+    baseRow = genericRecordToRow(baseRecord);
+    assertEquals("1970-01-01 08", keyGen.getPartitionPath(baseRow));
+
+    // timestamp is DATE_STRING, timezone is GMT, createTime is null
+    baseRecord.put("createTime", null);
+    properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh:mm:ss", "GMT", null);
+    properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss");
+    keyGen = new TimestampBasedKeyGenerator(properties);
+    HoodieKey hk6 = keyGen.getKey(baseRecord);
+    assertEquals("1970-01-01 12:00:00", hk6.getPartitionPath());
+
+    // test w/ Row
+    baseRow = genericRecordToRow(baseRecord);
+    assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow));
   }
 
   @Test
@@ -160,12 +183,24 @@ public class TestTimestampBasedKeyGenerator {
     // timezone is GMT
     properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
     TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
-    HoodieKey hk5 = keyGen.getKey(baseRecord);
-    assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
+    HoodieKey hk1 = keyGen.getKey(baseRecord);
+    assertEquals(hk1.getPartitionPath(), "2024-10-04 12");
 
     // test w/ Row
     baseRow = genericRecordToRow(baseRecord);
     assertEquals("2024-10-04 12", keyGen.getPartitionPath(baseRow));
+
+    // timezone is GMT, createTime is null
+    baseRecord.put("createTime", null);
+    properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
+    keyGen = new TimestampBasedKeyGenerator(properties);
+    HoodieKey hk2 = keyGen.getKey(baseRecord);
+    assertEquals("1970-01-02 12", hk2.getPartitionPath());
+
+    // test w/ Row
+    baseRow = genericRecordToRow(baseRecord);
+    assertEquals("1970-01-02 12", keyGen.getPartitionPath(baseRow));
+
   }
 
   @Test