You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/23 11:57:03 UTC
[hudi] branch master updated: [HUDI-1150] Fix unable to parse input
partition field :1 exception when using TimestampBasedKeyGenerator(#1920)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 35b2185 [HUDI-1150] Fix unable to parse input partition field :1 exception when using TimestampBasedKeyGenerator(#1920)
35b2185 is described below
commit 35b21855da209c812e006c1afff3d940d5ac2a18
Author: Mathieu <wx...@126.com>
AuthorDate: Sun Aug 23 19:56:50 2020 +0800
[HUDI-1150] Fix unable to parse input partition field :1 exception when using TimestampBasedKeyGenerator(#1920)
---
.../main/java/org/apache/hudi/DataSourceUtils.java | 6 ++--
.../apache/hudi/keygen/RowKeyGeneratorHelper.java | 2 +-
.../hudi/keygen/TimestampBasedKeyGenerator.java | 38 +++++++++++++++++---
...rser.java => AbstractHoodieDateTimeParser.java} | 40 +++++++++++++++++-----
.../keygen/parser/HoodieDateTimeParserImpl.java | 17 +++------
.../keygen/TestTimestampBasedKeyGenerator.java | 39 +++++++++++++++++++--
6 files changed, 109 insertions(+), 33 deletions(-)
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index ea2cc5c..19316d5 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -39,7 +39,7 @@ import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.KeyGenerator;
-import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
+import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.avro.LogicalTypes;
@@ -172,9 +172,9 @@ public class DataSourceUtils {
/**
* Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
*/
- public static HoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
+ public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
try {
- return (HoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
+ return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
} catch (Throwable e) {
throw new IOException("Could not load date time parser class " + parserClass, e);
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
index 02b8492..4c05489 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
@@ -146,7 +146,7 @@ public class RowKeyGeneratorHelper {
}
valueToProcess = (Row) valueToProcess.get(positions.get(index));
} else { // last index
- if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
+ if (null != valueToProcess.getAs(positions.get(index)) && valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
break;
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index 25a52fe..97a7d2e 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.keygen.parser.HoodieDateTimeParser;
+import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl;
import org.apache.avro.generic.GenericRecord;
@@ -41,6 +41,7 @@ import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -63,10 +64,11 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
private final String outputDateFormat;
private transient Option<DateTimeFormatter> inputFormatter;
private transient DateTimeFormatter partitionFormatter;
- private final HoodieDateTimeParser parser;
+ private final AbstractHoodieDateTimeParser parser;
// TimeZone detailed settings reference
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
+ private final DateTimeZone inputDateTimeZone;
private final DateTimeZone outputDateTimeZone;
protected final boolean encodePartitionPath;
@@ -107,6 +109,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
super(config, recordKeyField, partitionPathField);
String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName());
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
+ this.inputDateTimeZone = parser.getInputDateTimeZone();
this.outputDateTimeZone = parser.getOutputDateTimeZone();
this.outputDateFormat = parser.getOutputDateFormat();
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
@@ -133,7 +136,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
public String getPartitionPath(GenericRecord record) {
Object partitionVal = HoodieAvroUtils.getNestedFieldVal(record, getPartitionPathFields().get(0), true);
if (partitionVal == null) {
- partitionVal = 1L;
+ partitionVal = getDefaultPartitionVal();
}
try {
return getPartitionPath(partitionVal);
@@ -143,6 +146,31 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
}
/**
+ * Set default value to partitionVal if the input value of partitionPathField is null.
+ */
+ private Object getDefaultPartitionVal() {
+ Object result = 1L;
+ if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
+ // since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP
+ // configured, here we take the first.
+ // {Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP} won't be null, it has been checked in the initialization process of
+ // inputFormatter
+ String delimiter = parser.getConfigInputDateFormatDelimiter();
+ String format = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "").split(delimiter)[0];
+
+ // if both input and output timeZone are not configured, use GMT.
+ if (null != inputDateTimeZone) {
+ return new DateTime(result, inputDateTimeZone).toString(format);
+ } else if (null != outputDateTimeZone) {
+ return new DateTime(result, outputDateTimeZone).toString(format);
+ } else {
+ return new DateTime(result, DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT"))).toString(format);
+ }
+ }
+ return result;
+ }
+
+ /**
* The function takes care of lazily initialising dateTimeFormatter variables only once.
*/
private void initIfNeeded() {
@@ -219,9 +247,9 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
buildFieldPositionMapIfNeeded(row.schema());
Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
try {
- if (partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
+ if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
|| partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
- fieldVal = 1L;
+ fieldVal = getDefaultPartitionVal();
} else {
fieldVal = partitionPathFieldVal;
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
similarity index 52%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
rename to hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
index 6612f4c..80e26cc 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
@@ -17,35 +17,57 @@
package org.apache.hudi.keygen.parser;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import java.io.Serializable;
-public interface HoodieDateTimeParser extends Serializable {
+public abstract class AbstractHoodieDateTimeParser implements Serializable {
+
+ protected final TypedProperties config;
+ protected final String configInputDateFormatDelimiter;
+
+ public AbstractHoodieDateTimeParser(TypedProperties config) {
+ this.config = config;
+ this.configInputDateFormatDelimiter = initInputDateFormatDelimiter();
+ }
+
+ private String initInputDateFormatDelimiter() {
+ String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
+ inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
+ return inputDateFormatDelimiter;
+ }
/**
* Returns the output date format in which the partition paths will be created for the hudi dataset.
- * @return
*/
- String getOutputDateFormat();
+ public String getOutputDateFormat() {
+ return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
+ }
/**
* Returns input formats in which datetime based values might be coming in incoming records.
- * @return
*/
- Option<DateTimeFormatter> getInputFormatter();
+ public abstract Option<DateTimeFormatter> getInputFormatter();
/**
* Returns the datetime zone one should expect the incoming values into.
- * @return
*/
- DateTimeZone getInputDateTimeZone();
+ public abstract DateTimeZone getInputDateTimeZone();
/**
* Returns the datetime zone using which the final partition paths for hudi dataset are created.
- * @return
*/
- DateTimeZone getOutputDateTimeZone();
+ public abstract DateTimeZone getOutputDateTimeZone();
+
+ /**
+ * Returns the input date format delimiter, comma by default.
+ */
+ public String getConfigInputDateFormatDelimiter() {
+ return this.configInputDateFormatDelimiter;
+ }
+
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
index 11790cb..41452d0 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
@@ -28,32 +28,22 @@ import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
import org.joda.time.format.DateTimeParser;
-import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.TimeZone;
-public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializable {
+public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
private String configInputDateFormatList;
- private final String configInputDateFormatDelimiter;
- private final TypedProperties config;
// TimeZone detailed settings reference
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
private final DateTimeZone inputDateTimeZone;
public HoodieDateTimeParserImpl(TypedProperties config) {
- this.config = config;
+ super(config);
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
this.inputDateTimeZone = getInputDateTimeZone();
- this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
- }
-
- private String getConfigInputDateFormatDelimiter() {
- String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
- inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
- return inputDateFormatDelimiter;
}
private DateTimeFormatter getInputDateFormatter() {
@@ -65,7 +55,7 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
.append(
null,
Arrays.stream(
- this.configInputDateFormatList.split(this.configInputDateFormatDelimiter))
+ this.configInputDateFormatList.split(super.configInputDateFormatDelimiter))
.map(String::trim)
.map(DateTimeFormat::forPattern)
.map(DateTimeFormatter::getParser)
@@ -119,4 +109,5 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
}
return !outputTimeZone.trim().isEmpty() ? DateTimeZone.forTimeZone(TimeZone.getTimeZone(outputTimeZone)) : null;
}
+
}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index 6afc6eb..7867415 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -150,6 +150,29 @@ public class TestTimestampBasedKeyGenerator {
// test w/ Row
assertEquals("2020-01-06 12", keyGen.getPartitionPath(baseRow));
+
+ // timezone is GMT+8:00, createTime is null
+ baseRecord.put("createTime", null);
+ properties = getBaseKeyConfig("EPOCHMILLISECONDS", "yyyy-MM-dd hh", "GMT+8:00", null);
+ keyGen = new TimestampBasedKeyGenerator(properties);
+ HoodieKey hk5 = keyGen.getKey(baseRecord);
+ assertEquals("1970-01-01 08", hk5.getPartitionPath());
+
+ // test w/ Row
+ baseRow = genericRecordToRow(baseRecord);
+ assertEquals("1970-01-01 08", keyGen.getPartitionPath(baseRow));
+
+ // timestamp is DATE_STRING, timezone is GMT, createTime is null
+ baseRecord.put("createTime", null);
+ properties = getBaseKeyConfig("DATE_STRING", "yyyy-MM-dd hh:mm:ss", "GMT", null);
+ properties.setProperty("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd hh:mm:ss");
+ keyGen = new TimestampBasedKeyGenerator(properties);
+ HoodieKey hk6 = keyGen.getKey(baseRecord);
+ assertEquals("1970-01-01 12:00:00", hk6.getPartitionPath());
+
+ // test w/ Row
+ baseRow = genericRecordToRow(baseRecord);
+ assertEquals("1970-01-01 12:00:00", keyGen.getPartitionPath(baseRow));
}
@Test
@@ -160,12 +183,24 @@ public class TestTimestampBasedKeyGenerator {
// timezone is GMT
properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
TimestampBasedKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
- HoodieKey hk5 = keyGen.getKey(baseRecord);
- assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
+ HoodieKey hk1 = keyGen.getKey(baseRecord);
+ assertEquals(hk1.getPartitionPath(), "2024-10-04 12");
// test w/ Row
baseRow = genericRecordToRow(baseRecord);
assertEquals("2024-10-04 12", keyGen.getPartitionPath(baseRow));
+
+ // timezone is GMT, createTime is null
+ baseRecord.put("createTime", null);
+ properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
+ keyGen = new TimestampBasedKeyGenerator(properties);
+ HoodieKey hk2 = keyGen.getKey(baseRecord);
+ assertEquals("1970-01-02 12", hk2.getPartitionPath());
+
+ // test w/ Row
+ baseRow = genericRecordToRow(baseRecord);
+ assertEquals("1970-01-02 12", keyGen.getPartitionPath(baseRow));
+
}
@Test