You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/11/02 21:17:44 UTC
[hudi] branch master updated: [HUDI-912] Refactor and relocate
KeyGenerator to support more engines (#2200)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d160abb [HUDI-912] Refactor and relocate KeyGenerator to support more engines (#2200)
d160abb is described below
commit d160abb43740e0bcdf40458c345ecd2d74e6698c
Author: wangxianghu <wx...@126.com>
AuthorDate: Tue Nov 3 05:12:51 2020 +0800
[HUDI-912] Refactor and relocate KeyGenerator to support more engines (#2200)
* [HUDI-912] Refactor and relocate KeyGenerator to support more engines
* Rename KeyGenerators
---
hudi-client/hudi-client-common/pom.xml | 5 +
.../exception/HoodieKeyGeneratorException.java | 33 +++++
.../org/apache/hudi/keygen/BaseKeyGenerator.java | 81 +++++++++++
.../hudi/keygen/ComplexAvroKeyGenerator.java | 50 +++++++
.../apache/hudi/keygen/CustomAvroKeyGenerator.java | 73 ++++------
.../hudi/keygen/GlobalAvroDeleteKeyGenerator.java | 59 ++++++++
.../java/org/apache/hudi/keygen/KeyGenUtils.java | 33 ++++-
.../java/org/apache/hudi/keygen/KeyGenerator.java | 38 +----
.../keygen/NonpartitionedAvroKeyGenerator.java | 28 ++--
.../apache/hudi/keygen/SimpleAvroKeyGenerator.java | 57 ++++++++
.../keygen/TimestampBasedAvroKeyGenerator.java | 79 ++++-------
.../hudi/keygen/constant/KeyGeneratorOptions.java | 41 ++++++
.../parser/AbstractHoodieDateTimeParser.java | 6 +-
.../keygen/parser/HoodieDateTimeParserImpl.java | 10 +-
hudi-client/hudi-spark-client/pom.xml | 7 +
.../apache/hudi/keygen/BuiltinKeyGenerator.java | 80 +++++------
.../apache/hudi/keygen/ComplexKeyGenerator.java | 36 ++---
.../org/apache/hudi/keygen/CustomKeyGenerator.java | 70 ++++------
.../hudi/keygen/GlobalDeleteKeyGenerator.java | 18 +--
.../hudi/keygen/NonpartitionedKeyGenerator.java | 20 +--
.../apache/hudi/keygen/RowKeyGeneratorHelper.java | 4 +-
.../org/apache/hudi/keygen/SimpleKeyGenerator.java | 16 ++-
.../hudi/keygen/TimestampBasedKeyGenerator.java | 82 +++++++++++
.../org/apache/hudi/AvroConversionHelper.scala | 17 +--
.../org/apache/hudi/AvroConversionUtils.scala | 21 +--
.../hudi/keygen/TestComplexKeyGenerator.java | 154 +++++++++++++++++++++
.../apache/hudi/keygen/TestCustomKeyGenerator.java | 50 +++----
.../hudi/keygen/TestGlobalDeleteKeyGenerator.java | 14 +-
.../apache/hudi/keygen/TestSimpleKeyGenerator.java | 22 +--
.../keygen/TestTimestampBasedKeyGenerator.java | 50 +++----
.../hudi/testutils/KeyGeneratorTestUtilities.java | 0
.../apache/hudi/HoodieDatasetBulkInsertHelper.java | 4 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 13 +-
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 1 -
.../src/test/java/TestComplexKeyGenerator.java | 86 ------------
.../hudi/keygen/TestComplexKeyGenerator.java | 96 -------------
.../org/apache/hudi/TestAvroConversionHelper.scala | 3 +-
.../org/apache/hudi/TestDataSourceDefaults.scala | 19 ++-
38 files changed, 895 insertions(+), 581 deletions(-)
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index 902de58..487a2e2 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -50,6 +50,11 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java
new file mode 100644
index 0000000..5bf06e2
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Exception thrown for any higher level errors when {@link org.apache.hudi.keygen.KeyGeneratorInterface} is generating
+ * a {@link org.apache.hudi.common.model.HoodieKey}.
+ */
+public class HoodieKeyGeneratorException extends HoodieException {
+
+ public HoodieKeyGeneratorException(String msg, Throwable e) {
+ super(msg, e);
+ }
+
+ public HoodieKeyGeneratorException(String msg) {
+ super(msg);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
new file mode 100644
index 0000000..8020be8
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class BaseKeyGenerator extends KeyGenerator {
+
+ protected List<String> recordKeyFields;
+ protected List<String> partitionPathFields;
+ protected final boolean encodePartitionPath;
+ protected final boolean hiveStylePartitioning;
+
+ protected BaseKeyGenerator(TypedProperties config) {
+ super(config);
+ this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY,
+ Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL));
+ this.hiveStylePartitioning = config.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,
+ Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL));
+ }
+
+ /**
+ * Generate a record Key out of provided generic record.
+ */
+ public abstract String getRecordKey(GenericRecord record);
+
+ /**
+ * Generate a partition path out of provided generic record.
+ */
+ public abstract String getPartitionPath(GenericRecord record);
+
+ /**
+ * Generate a Hoodie Key out of provided generic record.
+ */
+ @Override
+ public final HoodieKey getKey(GenericRecord record) {
+ if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+ throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+ }
+ return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+ }
+
+ @Override
+ public final List<String> getRecordKeyFieldNames() {
+ // For nested columns, pick top level column name
+ return getRecordKeyFields().stream().map(k -> {
+ int idx = k.indexOf('.');
+ return idx > 0 ? k.substring(0, idx) : k;
+ }).collect(Collectors.toList());
+ }
+
+ public List<String> getRecordKeyFields() {
+ return recordKeyFields;
+ }
+
+ public List<String> getPartitionPathFields() {
+ return partitionPathFields;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
new file mode 100644
index 0000000..edc1ad9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
+ */
+public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
+ public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
+
+ public ComplexAvroKeyGenerator(TypedProperties props) {
+ super(props);
+ this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
+ .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+ this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
+ .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+ }
+
+ @Override
+ public String getRecordKey(GenericRecord record) {
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+ }
+
+ @Override
+ public String getPartitionPath(GenericRecord record) {
+ return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
+ }
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
similarity index 64%
copy from hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
copy to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index e457688..6266fd1 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -18,14 +18,11 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.io.IOException;
import java.util.Arrays;
@@ -34,17 +31,17 @@ import java.util.stream.Collectors;
/**
* This is a generic implementation of KeyGenerator where users can configure record key as a single field or a combination of fields. Similarly partition path can be configured to have multiple
* fields or only one field. This class expects value for prop "hoodie.datasource.write.partitionpath.field" in a specific format. For example:
- *
+ * <p>
* properties.put("hoodie.datasource.write.partitionpath.field", "field1:PartitionKeyType1,field2:PartitionKeyType2").
- *
+ * <p>
* The complete partition path is created as <value for field1 basis PartitionKeyType1>/<value for field2 basis PartitionKeyType2> and so on.
- *
+ * <p>
* Few points to consider: 1. If you want to customize some partition path field on a timestamp basis, you can use field1:timestampBased 2. If you simply want to have the value of your configured
* field in the partition path, use field1:simple 3. If you want your table to be non partitioned, simply leave it as blank.
- *
+ * <p>
* RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator.
*/
-public class CustomKeyGenerator extends BuiltinKeyGenerator {
+public class CustomAvroKeyGenerator extends BaseKeyGenerator {
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
private static final String SPLIT_REGEX = ":";
@@ -56,23 +53,14 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
SIMPLE, TIMESTAMP
}
- public CustomKeyGenerator(TypedProperties props) {
+ public CustomAvroKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
- this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
- }
-
- @Override
- public String getPartitionPath(Row row) {
- return getPartitionPath(Option.empty(), Option.of(row));
+ this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
+ this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
}
@Override
public String getPartitionPath(GenericRecord record) {
- return getPartitionPath(Option.of(record), Option.empty());
- }
-
- private String getPartitionPath(Option<GenericRecord> record, Option<Row> row) {
if (getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for partition path in cfg");
}
@@ -94,27 +82,18 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
switch (keyType) {
case SIMPLE:
- if (record.isPresent()) {
- partitionPath.append(new SimpleKeyGenerator(config, partitionPathField).getPartitionPath(record.get()));
- } else {
- partitionPath.append(new SimpleKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
- }
+ partitionPath.append(new SimpleAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
break;
case TIMESTAMP:
try {
- if (record.isPresent()) {
- partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(record.get()));
- } else {
- partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
- }
- } catch (IOException ioe) {
- throw new HoodieDeltaStreamerException("Unable to initialise TimestampBasedKeyGenerator class");
+ partitionPath.append(new TimestampBasedAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
+ } catch (IOException e) {
+ throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class");
}
break;
default:
- throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
+ throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
}
-
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
partitionPath.deleteCharAt(partitionPath.length() - 1);
@@ -125,16 +104,8 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
public String getRecordKey(GenericRecord record) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
- ? new SimpleKeyGenerator(config).getRecordKey(record)
- : new ComplexKeyGenerator(config).getRecordKey(record);
- }
-
- @Override
- public String getRecordKey(Row row) {
- validateRecordKeyFields();
- return getRecordKeyFields().size() == 1
- ? new SimpleKeyGenerator(config).getRecordKey(row)
- : new ComplexKeyGenerator(config).getRecordKey(row);
+ ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
+ : new ComplexAvroKeyGenerator(config).getRecordKey(record);
}
private void validateRecordKeyFields() {
@@ -142,4 +113,12 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
throw new HoodieKeyException("Unable to find field names for record key in cfg");
}
}
+
+ public String getDefaultPartitionPathSeparator() {
+ return DEFAULT_PARTITION_PATH_SEPARATOR;
+ }
+
+ public String getSplitRegex() {
+ return SPLIT_REGEX;
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
new file mode 100644
index 0000000..b074a25
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Avro Key generator for deletes using global indices. Global index deletes do not require partition value so this key generator
+ * avoids using partition value for generating HoodieKey.
+ */
+public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
+
+ private static final String EMPTY_PARTITION = "";
+
+ public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
+ super(config);
+ this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(","));
+ }
+
+ @Override
+ public String getRecordKey(GenericRecord record) {
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+ }
+
+ @Override
+ public String getPartitionPath(GenericRecord record) {
+ return EMPTY_PARTITION;
+ }
+
+ @Override
+ public List<String> getPartitionPathFields() {
+ return new ArrayList<>();
+ }
+
+ public String getEmptyPartition() {
+ return EMPTY_PARTITION;
+ }
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
similarity index 82%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 8e9700b..1f59bab 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -18,14 +18,20 @@
package org.apache.hudi.keygen;
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
public class KeyGenUtils {
@@ -111,4 +117,23 @@ public class KeyGenUtils {
}
return partitionPath;
}
+
+ /**
+ * Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
+ */
+ public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException {
+ try {
+ return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
+ } catch (Throwable e) {
+ throw new IOException("Could not load date time parser class " + parserClass, e);
+ }
+ }
+
+ public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
+ checkPropNames.forEach(prop -> {
+ if (!props.containsKey(prop)) {
+ throw new HoodieNotSupportedException("Required property " + prop + " is missing");
+ }
+ });
+ }
}
\ No newline at end of file
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
similarity index 57%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
index 3b1db08..8c3f794 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
@@ -19,30 +19,22 @@
package org.apache.hudi.keygen;
import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.AvroConversionHelper;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
-import scala.Function1;
-import java.io.Serializable;
import java.util.List;
/**
* Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record.
*/
@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
-public abstract class KeyGenerator implements Serializable, SparkKeyGeneratorInterface {
-
- private static final String STRUCT_NAME = "hoodieRowTopLevelField";
- private static final String NAMESPACE = "hoodieRow";
+public abstract class KeyGenerator implements KeyGeneratorInterface {
protected TypedProperties config;
- private transient Function1<Object, Object> converterFn = null;
protected KeyGenerator(TypedProperties config) {
this.config = config;
@@ -64,32 +56,4 @@ public abstract class KeyGenerator implements Serializable, SparkKeyGeneratorInt
throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
+ "Please override this method in your custom key generator.");
}
-
- /**
- * Fetch record key from {@link Row}.
- * @param row instance of {@link Row} from which record key is requested.
- * @return the record key of interest from {@link Row}.
- */
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getRecordKey(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
- }
- GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
- return getKey(genericRecord).getRecordKey();
- }
-
- /**
- * Fetch partition path from {@link Row}.
- * @param row instance of {@link Row} from which partition path is requested
- * @return the partition path of interest from {@link Row}.
- */
- @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- public String getPartitionPath(Row row) {
- if (null == converterFn) {
- converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
- }
- GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
- return getKey(genericRecord).getPartitionPath();
- }
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
similarity index 55%
copy from hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
copy to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index db51024..a5272b3 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -1,13 +1,12 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +17,21 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.common.config.TypedProperties;
-
import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
+import org.apache.hudi.common.config.TypedProperties;
import java.util.ArrayList;
import java.util.List;
/**
- * Simple Key generator for unpartitioned Hive Tables.
+ * Avro simple Key generator for unpartitioned Hive Tables.
*/
-public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
+public class NonpartitionedAvroKeyGenerator extends SimpleAvroKeyGenerator {
private static final String EMPTY_PARTITION = "";
private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>();
- public NonpartitionedKeyGenerator(TypedProperties props) {
+ public NonpartitionedAvroKeyGenerator(TypedProperties props) {
super(props);
}
@@ -48,8 +45,7 @@ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
return EMPTY_PARTITION_FIELD_LIST;
}
- @Override
- public String getPartitionPath(Row row) {
+ public String getEmptyPartition() {
return EMPTY_PARTITION;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
new file mode 100644
index 0000000..59fe6be
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.Collections;
+
+/**
+ * Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
+ */
+public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
+
+ public SimpleAvroKeyGenerator(TypedProperties props) {
+ this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+ props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+ }
+
+ SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
+ this(props, null, partitionPathField);
+ }
+
+ SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
+ super(props);
+ this.recordKeyFields = recordKeyField == null
+ ? Collections.emptyList()
+ : Collections.singletonList(recordKeyField);
+ this.partitionPathFields = Collections.singletonList(partitionPathField);
+ }
+
+ @Override
+ public String getRecordKey(GenericRecord record) {
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
+ }
+
+ @Override
+ public String getPartitionPath(GenericRecord record) {
+ return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
+ }
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
similarity index 72%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
index 97a7d2e..28048a1 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
@@ -1,13 +1,12 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +17,16 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -46,15 +42,11 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
-import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
-import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
/**
- * Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
+ * Avro Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
*/
-public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
-
+public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
public enum TimestampType implements Serializable {
UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
}
@@ -96,19 +88,19 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class";
}
- public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
- this(config, config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()),
- config.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()));
+ public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException {
+ this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+ config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
}
- TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
+ TimestampBasedAvroKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
this(config, null, partitionPathField);
}
- TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
+ TimestampBasedAvroKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
super(config, recordKeyField, partitionPathField);
String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName());
- this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
+ this.parser = KeyGenUtils.createDateTimeParser(config, dateTimeParserClass);
this.inputDateTimeZone = parser.getInputDateTimeZone();
this.outputDateTimeZone = parser.getOutputDateTimeZone();
this.outputDateFormat = parser.getOutputDateFormat();
@@ -128,8 +120,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
default:
timeUnit = null;
}
- this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
- Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
+ this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY,
+ Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL));
}
@Override
@@ -141,14 +133,14 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
try {
return getPartitionPath(partitionVal);
} catch (Exception e) {
- throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e);
+ throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + partitionVal, e);
}
}
/**
* Set default value to partitionVal if the input value of partitionPathField is null.
*/
- private Object getDefaultPartitionVal() {
+ public Object getDefaultPartitionVal() {
Object result = 1L;
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
// since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP
@@ -191,7 +183,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
* @param partitionVal partition path object value fetched from record/row
* @return the parsed partition path based on data type
*/
- private String getPartitionPath(Object partitionVal) {
+ public String getPartitionPath(Object partitionVal) {
initIfNeeded();
long timeMs;
if (partitionVal instanceof Double) {
@@ -202,7 +194,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
timeMs = convertLongTimeToMillis((Long) partitionVal);
} else if (partitionVal instanceof CharSequence) {
if (!inputFormatter.isPresent()) {
- throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
+ throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
}
DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
if (this.outputDateTimeZone == null) {
@@ -235,27 +227,4 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
return MILLISECONDS.convert(partitionVal, timeUnit);
}
- @Override
- public String getRecordKey(Row row) {
- buildFieldPositionMapIfNeeded(row.schema());
- return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false);
- }
-
- @Override
- public String getPartitionPath(Row row) {
- Object fieldVal = null;
- buildFieldPositionMapIfNeeded(row.schema());
- Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
- try {
- if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
- || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
- fieldVal = getDefaultPartitionVal();
- } else {
- fieldVal = partitionPathFieldVal;
- }
- return getPartitionPath(fieldVal);
- } catch (Exception e) {
- throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + fieldVal, e);
- }
- }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
new file mode 100644
index 0000000..da567e0
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen.constant;
+
+public class KeyGeneratorOptions {
+
+ /**
+ * Flag to indicate whether to use Hive style partitioning.
+ * If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
+ * By default false (the names of partition folders are only partition values)
+ */
+ public static final String URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode";
+ public static final String DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false";
+ public static final String HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning";
+ public static final String DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false";
+
+ /**
+ * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
+ * will be obtained by invoking .toString() on the field value. Nested fields can be specified using
+ * the dot notation eg: `a.b.c`
+ */
+ public static final String RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field";
+ public static final String PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field";
+}
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
similarity index 88%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
index 80e26cc..6fb05c3 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
@@ -19,7 +19,7 @@ package org.apache.hudi.keygen.parser;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
@@ -36,7 +36,7 @@ public abstract class AbstractHoodieDateTimeParser implements Serializable {
}
private String initInputDateFormatDelimiter() {
- String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
+ String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
return inputDateFormatDelimiter;
}
@@ -45,7 +45,7 @@ public abstract class AbstractHoodieDateTimeParser implements Serializable {
* Returns the output date format in which the partition paths will be created for the hudi dataset.
*/
public String getOutputDateFormat() {
- return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
+ return config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
}
/**
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
similarity index 91%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
index 41452d0..81960ea 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
@@ -17,11 +17,11 @@
package org.apache.hudi.keygen.parser;
-import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
-import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config;
+import org.apache.hudi.keygen.KeyGenUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -42,7 +42,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
public HoodieDateTimeParserImpl(TypedProperties config) {
super(config);
- DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
+ KeyGenUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
this.inputDateTimeZone = getInputDateTimeZone();
}
@@ -79,7 +79,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
public Option<DateTimeFormatter> getInputFormatter() {
TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
- DataSourceUtils.checkRequiredProperties(config,
+ KeyGenUtils.checkRequiredProperties(config,
Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
return Option.of(getInputDateFormatter());
diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml
index d99346d..5cc6ad6 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -31,6 +31,13 @@
<packaging>jar</packaging>
<dependencies>
+ <!-- Scala -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
<!-- Hudi -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
similarity index 61%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index 8c973a6..a0c1991 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -18,70 +18,67 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
+import scala.Function1;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/**
- * Base class for all the built-in key generators. Contains methods structured for
+ * Base class for the built-in key generators. Contains methods structured for
* code reuse amongst them.
*/
-public abstract class BuiltinKeyGenerator extends KeyGenerator {
+public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
- protected List<String> recordKeyFields;
- protected List<String> partitionPathFields;
- protected final boolean encodePartitionPath;
- protected final boolean hiveStylePartitioning;
+ private static final String STRUCT_NAME = "hoodieRowTopLevelField";
+ private static final String NAMESPACE = "hoodieRow";
+ private transient Function1<Object, Object> converterFn = null;
+ protected StructType structType;
protected Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
protected Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
- protected StructType structType;
protected BuiltinKeyGenerator(TypedProperties config) {
super(config);
- this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
- Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
- this.hiveStylePartitioning = config.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
- Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
}
/**
- * Generate a record Key out of provided generic record.
- */
- public abstract String getRecordKey(GenericRecord record);
-
- /**
- * Generate a partition path out of provided generic record.
- */
- public abstract String getPartitionPath(GenericRecord record);
-
- /**
- * Generate a Hoodie Key out of provided generic record.
+ * Fetch record key from {@link Row}.
+ * @param row instance of {@link Row} from which record key is requested.
+ * @return the record key of interest from {@link Row}.
*/
- public final HoodieKey getKey(GenericRecord record) {
- if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
- throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+ @Override
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public String getRecordKey(Row row) {
+ if (null == converterFn) {
+ converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
}
- return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+ GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+ return getKey(genericRecord).getRecordKey();
}
+ /**
+ * Fetch partition path from {@link Row}.
+ * @param row instance of {@link Row} from which partition path is requested
+ * @return the partition path of interest from {@link Row}.
+ */
@Override
- public final List<String> getRecordKeyFieldNames() {
- // For nested columns, pick top level column name
- return getRecordKeyFields().stream().map(k -> {
- int idx = k.indexOf('.');
- return idx > 0 ? k.substring(0, idx) : k;
- }).collect(Collectors.toList());
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public String getPartitionPath(Row row) {
+ if (null == converterFn) {
+ converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
+ }
+ GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+ return getKey(genericRecord).getPartitionPath();
}
void buildFieldPositionMapIfNeeded(StructType structType) {
@@ -119,12 +116,5 @@ public abstract class BuiltinKeyGenerator extends KeyGenerator {
this.structType = structType;
}
}
-
- public List<String> getRecordKeyFields() {
- return recordKeyFields;
- }
-
- public List<String> getPartitionPathFields() {
- return partitionPathFields;
- }
}
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
similarity index 61%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index e679e99..36c8345 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -1,13 +1,12 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,38 +17,38 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.spark.sql.Row;
import java.util.Arrays;
import java.util.stream.Collectors;
-import org.apache.spark.sql.Row;
/**
* Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
public class ComplexKeyGenerator extends BuiltinKeyGenerator {
- public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
+ private final ComplexAvroKeyGenerator complexAvroKeyGenerator;
public ComplexKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY())
+ this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
- this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY())
+ this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+ complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
}
@Override
public String getRecordKey(GenericRecord record) {
- return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+ return complexAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
- return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
+ return complexAvroKeyGenerator.getPartitionPath(record);
}
@Override
@@ -64,4 +63,5 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
hiveStylePartitioning, partitionPathPositions);
}
-}
\ No newline at end of file
+
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
similarity index 75%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index e457688..6727b79 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -18,13 +18,12 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.io.IOException;
@@ -46,30 +45,36 @@ import java.util.stream.Collectors;
*/
public class CustomKeyGenerator extends BuiltinKeyGenerator {
- private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
- private static final String SPLIT_REGEX = ":";
-
- /**
- * Used as a part of config in CustomKeyGenerator.java.
- */
- public enum PartitionKeyType {
- SIMPLE, TIMESTAMP
- }
+ private final CustomAvroKeyGenerator customAvroKeyGenerator;
public CustomKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
- this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
+ this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
+ this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
+ customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
}
@Override
- public String getPartitionPath(Row row) {
- return getPartitionPath(Option.empty(), Option.of(row));
+ public String getRecordKey(GenericRecord record) {
+ return customAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
- return getPartitionPath(Option.of(record), Option.empty());
+ return customAvroKeyGenerator.getPartitionPath(record);
+ }
+
+ @Override
+ public String getRecordKey(Row row) {
+ validateRecordKeyFields();
+ return getRecordKeyFields().size() == 1
+ ? new SimpleKeyGenerator(config).getRecordKey(row)
+ : new ComplexKeyGenerator(config).getRecordKey(row);
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ return getPartitionPath(Option.empty(), Option.of(row));
}
private String getPartitionPath(Option<GenericRecord> record, Option<Row> row) {
@@ -85,13 +90,13 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
return "";
}
for (String field : getPartitionPathFields()) {
- String[] fieldWithType = field.split(SPLIT_REGEX);
+ String[] fieldWithType = field.split(customAvroKeyGenerator.getSplitRegex());
if (fieldWithType.length != 2) {
- throw new HoodieKeyException("Unable to find field names for partition path in proper format");
+ throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format");
}
partitionPathField = fieldWithType[0];
- PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+ CustomAvroKeyGenerator.PartitionKeyType keyType = CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
switch (keyType) {
case SIMPLE:
if (record.isPresent()) {
@@ -108,38 +113,23 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
}
} catch (IOException ioe) {
- throw new HoodieDeltaStreamerException("Unable to initialise TimestampBasedKeyGenerator class");
+ throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class");
}
break;
default:
- throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
+ throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
}
- partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+ partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator());
}
partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}
- @Override
- public String getRecordKey(GenericRecord record) {
- validateRecordKeyFields();
- return getRecordKeyFields().size() == 1
- ? new SimpleKeyGenerator(config).getRecordKey(record)
- : new ComplexKeyGenerator(config).getRecordKey(record);
- }
-
- @Override
- public String getRecordKey(Row row) {
- validateRecordKeyFields();
- return getRecordKeyFields().size() == 1
- ? new SimpleKeyGenerator(config).getRecordKey(row)
- : new ComplexKeyGenerator(config).getRecordKey(row);
- }
-
private void validateRecordKeyFields() {
if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
throw new HoodieKeyException("Unable to find field names for record key in cfg");
}
}
}
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
similarity index 78%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 243493b..5c9a813 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -18,10 +18,9 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.util.ArrayList;
@@ -34,21 +33,21 @@ import java.util.List;
*/
public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
- private static final String EMPTY_PARTITION = "";
-
+ private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator;
public GlobalDeleteKeyGenerator(TypedProperties config) {
super(config);
- this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
+ this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(","));
+ globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
}
@Override
public String getRecordKey(GenericRecord record) {
- return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+ return globalAvroDeleteKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
- return EMPTY_PARTITION;
+ return globalAvroDeleteKeyGenerator.getPartitionPath(record);
}
@Override
@@ -64,6 +63,7 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getPartitionPath(Row row) {
- return EMPTY_PARTITION;
+ return globalAvroDeleteKeyGenerator.getEmptyPartition();
}
}
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
similarity index 75%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index db51024..543e134 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -18,12 +18,10 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.common.config.TypedProperties;
-
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.spark.sql.Row;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -31,25 +29,27 @@ import java.util.List;
*/
public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
- private static final String EMPTY_PARTITION = "";
- private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>();
+ private final NonpartitionedAvroKeyGenerator nonpartitionedAvroKeyGenerator;
- public NonpartitionedKeyGenerator(TypedProperties props) {
- super(props);
+ public NonpartitionedKeyGenerator(TypedProperties config) {
+ super(config);
+ nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(config);
}
@Override
public String getPartitionPath(GenericRecord record) {
- return EMPTY_PARTITION;
+ return nonpartitionedAvroKeyGenerator.getPartitionPath(record);
}
@Override
public List<String> getPartitionPathFields() {
- return EMPTY_PARTITION_FIELD_LIST;
+ return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
}
@Override
public String getPartitionPath(Row row) {
- return EMPTY_PARTITION;
+ return nonpartitionedAvroKeyGenerator.getEmptyPartition();
}
+
}
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
similarity index 99%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
index 4c05489..dd0d4c5 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
@@ -19,10 +19,10 @@
package org.apache.hudi.keygen;
import org.apache.hudi.exception.HoodieKeyException;
-
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import scala.Option;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,8 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import scala.Option;
-
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
similarity index 82%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index c2b8b12..332686d 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -18,10 +18,9 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import java.util.Collections;
@@ -31,9 +30,11 @@ import java.util.Collections;
*/
public class SimpleKeyGenerator extends BuiltinKeyGenerator {
+ private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
+
public SimpleKeyGenerator(TypedProperties props) {
- this(props, props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()),
- props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()));
+ this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+ props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
}
SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
@@ -46,16 +47,17 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
? Collections.emptyList()
: Collections.singletonList(recordKeyField);
this.partitionPathFields = Collections.singletonList(partitionPathField);
+ simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
}
@Override
public String getRecordKey(GenericRecord record) {
- return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
+ return simpleAvroKeyGenerator.getRecordKey(record);
}
@Override
public String getPartitionPath(GenericRecord record) {
- return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
+ return simpleAvroKeyGenerator.getPartitionPath(record);
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
new file mode 100644
index 0000000..859269c
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.spark.sql.Row;
+
+import java.io.IOException;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
+ */
+public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
+
+ private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
+
+ public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
+ this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+ config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+ }
+
+ TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
+ this(config, null, partitionPathField);
+ }
+
+ TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
+ super(config, recordKeyField, partitionPathField);
+ timestampBasedAvroKeyGenerator = new TimestampBasedAvroKeyGenerator(config, recordKeyField, partitionPathField);
+ }
+
+ @Override
+ public String getPartitionPath(GenericRecord record) {
+ return timestampBasedAvroKeyGenerator.getPartitionPath(record);
+ }
+
+ @Override
+ public String getRecordKey(Row row) {
+ buildFieldPositionMapIfNeeded(row.schema());
+ return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false);
+ }
+
+ @Override
+ public String getPartitionPath(Row row) {
+ Object fieldVal = null;
+ buildFieldPositionMapIfNeeded(row.schema());
+ Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
+ try {
+ if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
+ || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+ fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
+ } else {
+ fieldVal = partitionPathFieldVal;
+ }
+ return timestampBasedAvroKeyGenerator.getPartitionPath(fieldVal);
+ } catch (Exception e) {
+ throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + fieldVal, e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
similarity index 96%
rename from hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index c701e70..db1ca6f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,10 +24,10 @@ import java.util
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
-import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
+import org.apache.avro.{LogicalTypes, Schema}
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
similarity index 88%
rename from hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 70a1356..d1a4249 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +18,14 @@
package org.apache.hudi
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
-import org.apache.hudi.common.model.HoodieKey
import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieKey
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.collection.JavaConverters._
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
new file mode 100644
index 0000000..54f4ffa
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
+
+ private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+ TypedProperties properties = new TypedProperties();
+ if (getComplexRecordKey) {
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col");
+ } else {
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
+ }
+ properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
+ return properties;
+ }
+
+ private TypedProperties getPropertiesWithoutPartitionPathProp() {
+ return getCommonProps(false);
+ }
+
+ private TypedProperties getPropertiesWithoutRecordKeyProp() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+ return properties;
+ }
+
+ private TypedProperties getWrongRecordKeyFieldProps() {
+ TypedProperties properties = new TypedProperties();
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
+ return properties;
+ }
+
+ private TypedProperties getProps() {
+ TypedProperties properties = getCommonProps(true);
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms");
+ return properties;
+ }
+
+ @Test
+ public void testNullPartitionPathFields() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
+ }
+
+ @Test
+ public void testNullRecordKeyFields() {
+ Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ }
+
+ @Test
+ public void testWrongRecordKeyField() {
+ ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
+ Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
+ Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
+ }
+
+ @Test
+ public void testHappyFlow() {
+ ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
+ GenericRecord record = getRecord();
+ HoodieKey key = keyGenerator.getKey(record);
+ Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+ Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21");
+ Row row = KeyGeneratorTestUtilities.getRow(record);
+ Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
+ Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
+ }
+
+ @Test
+ public void testSingleValueKeyGenerator() {
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
+ properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+ ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
+ assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1);
+ assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
+ String rowKey = record.get("_row_key").toString();
+ String partitionPath = record.get("timestamp").toString();
+ HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
+ assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
+ assertEquals(partitionPath, hoodieKey.getPartitionPath());
+ }
+
+ @Test
+ public void testMultipleValueKeyGenerator() {
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp");
+ properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "rider,driver");
+ ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
+ assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
+ assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
+ String rowKey =
+ "_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ + "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
+ String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString();
+ HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
+ assertEquals(rowKey, hoodieKey.getRecordKey());
+ assertEquals(partitionPath, hoodieKey.getPartitionPath());
+ }
+
+ @Test
+ public void testMultipleValueKeyGeneratorNonPartitioned() {
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp");
+ properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "");
+ ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
+ assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
+ assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
+ String rowKey =
+ "_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+ + "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
+ String partitionPath = "";
+ HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
+ assertEquals(rowKey, hoodieKey.getRecordKey());
+ assertEquals(partitionPath, hoodieKey.getPartitionPath());
+ }
+}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
similarity index 77%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index add2547..dc30b93 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -18,11 +18,11 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -33,48 +33,48 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps(boolean getComplexRecordKey) {
TypedProperties properties = new TypedProperties();
if (getComplexRecordKey) {
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col");
} else {
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
}
- properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
+ properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
return properties;
}
private TypedProperties getPropertiesForSimpleKeyGen() {
TypedProperties properties = getCommonProps(false);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
return properties;
}
private TypedProperties getImproperPartitionFieldFormatProp() {
TypedProperties properties = getCommonProps(false);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
private TypedProperties getInvalidPartitionKeyTypeProps() {
TypedProperties properties = getCommonProps(false);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:dummy");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:dummy");
return properties;
}
private TypedProperties getComplexRecordKeyWithSimplePartitionProps() {
TypedProperties properties = getCommonProps(true);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
return properties;
}
private TypedProperties getComplexRecordKeyAndPartitionPathProps() {
TypedProperties properties = getCommonProps(true);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple,ts_ms:timestamp");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple,ts_ms:timestamp");
populateNecessaryPropsForTimestampBasedKeyGen(properties);
return properties;
}
private TypedProperties getPropsWithoutRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
return properties;
}
@@ -86,20 +86,20 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getPropertiesForTimestampBasedKeyGen() {
TypedProperties properties = getCommonProps(false);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "ts_ms:timestamp");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "ts_ms:timestamp");
populateNecessaryPropsForTimestampBasedKeyGen(properties);
return properties;
}
private TypedProperties getPropertiesForNonPartitionedKeyGen() {
TypedProperties properties = getCommonProps(false);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "");
return properties;
}
@Test
public void testSimpleKeyGenerator() {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -111,7 +111,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testTimestampBasedKeyGenerator() {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -123,7 +123,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testNonPartitionedKeyGenerator() {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -136,28 +136,28 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testInvalidPartitionKeyType() {
try {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when invalid PartitionKeyType is provided!");
} catch (Exception e) {
- Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
+ Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
}
try {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
GenericRecord record = getRecord();
Row row = KeyGeneratorTestUtilities.getRow(record);
keyGenerator.getPartitionPath(row);
Assertions.fail("should fail when invalid PartitionKeyType is provided!");
} catch (Exception e) {
- Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
+ Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
}
}
@Test
public void testNoRecordKeyFieldProp() {
try {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when record key field is not provided!");
} catch (Exception e) {
@@ -165,7 +165,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
}
try {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
GenericRecord record = getRecord();
Row row = KeyGeneratorTestUtilities.getRow(record);
keyGenerator.getRecordKey(row);
@@ -178,7 +178,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testPartitionFieldsInImproperFormat() {
try {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
keyGenerator.getKey(getRecord());
Assertions.fail("should fail when partition key field is provided in improper format!");
} catch (Exception e) {
@@ -186,7 +186,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
}
try {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
GenericRecord record = getRecord();
Row row = KeyGeneratorTestUtilities.getRow(record);
keyGenerator.getPartitionPath(row);
@@ -198,7 +198,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testComplexRecordKeyWithSimplePartitionPath() {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
@@ -211,7 +211,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testComplexRecordKeysWithComplexPartitionPath() {
- KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
+ BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
GenericRecord record = getRecord();
HoodieKey key = keyGenerator.getKey(record);
Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
similarity index 84%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
index 96d607a..078101b 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
@@ -18,12 +18,12 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -34,29 +34,29 @@ public class TestGlobalDeleteKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps(boolean getComplexRecordKey) {
TypedProperties properties = new TypedProperties();
if (getComplexRecordKey) {
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col");
} else {
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
}
- properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
+ properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
return properties;
}
private TypedProperties getPropertiesWithoutRecordKeyProp() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
private TypedProperties getWrongRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
return properties;
}
private TypedProperties getProps() {
TypedProperties properties = getCommonProps(true);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms");
return properties;
}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
similarity index 82%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
index 4eb184e..80b85d8 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -18,12 +18,12 @@
package org.apache.hudi.keygen;
-import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Assertions;
@@ -33,8 +33,8 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getCommonProps() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
- properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
+ properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
return properties;
}
@@ -44,34 +44,34 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
private TypedProperties getPropertiesWithoutRecordKeyProp() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
private TypedProperties getWrongRecordKeyFieldProps() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
return properties;
}
private TypedProperties getWrongPartitionPathFieldProps() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "_wrong_partition_path");
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "_wrong_partition_path");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
return properties;
}
private TypedProperties getComplexRecordKeyProp() {
TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+ properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col");
return properties;
}
private TypedProperties getProps() {
TypedProperties properties = getCommonProps();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+ properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
return properties;
}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
similarity index 84%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index 7867415..98a8f67 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -20,14 +20,14 @@ package org.apache.hudi.keygen;
import org.apache.hudi.AvroConversionHelper;
import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.StructType;
@@ -58,15 +58,15 @@ public class TestTimestampBasedKeyGenerator {
.generateAvroRecordFromJson(schema, 1, "001", "f1");
baseRow = genericRecordToRow(baseRecord);
- properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "field1");
- properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "createTime");
- properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "false");
+ properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
+ properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "createTime");
+ properties.setProperty(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "false");
}
private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) {
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat);
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone);
if (scalarType != null) {
properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType);
@@ -88,22 +88,22 @@ public class TestTimestampBasedKeyGenerator {
private TypedProperties getBaseKeyConfig(String timestampType, String inputFormatList, String inputFormatDelimiterRegex, String inputTimezone, String outputFormat, String outputTimezone) {
if (timestampType != null) {
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
}
if (inputFormatList != null) {
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList);
}
if (inputFormatDelimiterRegex != null) {
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex);
}
if (inputTimezone != null) {
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone);
}
if (outputFormat != null) {
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat);
}
if (outputTimezone != null) {
- properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone);
+ properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone);
}
return properties;
}
@@ -213,7 +213,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"GMT");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -231,7 +231,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -249,7 +249,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -267,7 +267,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040113", hk1.getPartitionPath());
@@ -285,7 +285,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040118", hk1.getPartitionPath());
@@ -303,7 +303,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040118", hk1.getPartitionPath());
@@ -321,7 +321,7 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"EST");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("2020040109", hk1.getPartitionPath());
@@ -339,11 +339,11 @@ public class TestTimestampBasedKeyGenerator {
"",
"yyyyMMddHH",
"UTC");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
- Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getKey(baseRecord));
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getKey(baseRecord));
baseRow = genericRecordToRow(baseRecord);
- Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getPartitionPath(baseRow));
+ Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getPartitionPath(baseRow));
}
@Test
@@ -356,7 +356,7 @@ public class TestTimestampBasedKeyGenerator {
"UTC",
"MM/dd/yyyy",
"UTC");
- KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+ BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
HoodieKey hk1 = keyGen.getKey(baseRecord);
Assertions.assertEquals("04/01/2020", hk1.getPartitionPath());
diff --git a/hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
similarity index 100%
rename from hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
diff --git a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
index b3ed7ae..c820ebe 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -72,7 +72,7 @@ public class HoodieDatasetBulkInsertHelper {
TypedProperties properties = new TypedProperties();
properties.putAll(config.getProps());
String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
- KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
+ BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
StructType structTypeForUDF = rows.schema();
sqlContext.udf().register(RECORD_KEY_UDF_FN, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ce940f7..fc52b38 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.hive.HiveSyncTool
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.log4j.LogManager
/**
@@ -213,14 +214,14 @@ object DataSourceWriteOptions {
* the dot notation eg: `a.b.c`
*
*/
- val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
+ val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY
val DEFAULT_RECORDKEY_FIELD_OPT_VAL = "uuid"
/**
* Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual
* value ontained by invoking .toString()
*/
- val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field"
+ val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY
val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath"
/**
@@ -228,10 +229,10 @@ object DataSourceWriteOptions {
* If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
* By default false (the names of partition folders are only partition values)
*/
- val HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning"
- val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false"
- val URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode"
- val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false"
+ val HIVE_STYLE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY
+ val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL
+ val URL_ENCODE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY
+ val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL
/**
* Key generator class, that implements will extract the key out of incoming record
*
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 32d6a09..1481024 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
-
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
diff --git a/hudi-spark/src/test/java/TestComplexKeyGenerator.java b/hudi-spark/src/test/java/TestComplexKeyGenerator.java
deleted file mode 100644
index a5a88c2..0000000
--- a/hudi-spark/src/test/java/TestComplexKeyGenerator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import static junit.framework.TestCase.assertEquals;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.keygen.ComplexKeyGenerator;
-import org.junit.jupiter.api.Test;
-
-public class TestComplexKeyGenerator {
-
- @Test
- public void testSingleValueKeyGenerator() {
- TypedProperties properties = new TypedProperties();
- properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
- properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
- ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
- assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1);
- assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
- String rowKey = record.get("_row_key").toString();
- String partitionPath = record.get("timestamp").toString();
- HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
- assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
- assertEquals(partitionPath, hoodieKey.getPartitionPath());
- }
-
- @Test
- public void testMultipleValueKeyGenerator() {
- TypedProperties properties = new TypedProperties();
- properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp");
- properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "rider,driver");
- ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
- assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
- assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
- String rowKey =
- "_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
- + "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
- String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString();
- HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
- assertEquals(rowKey, hoodieKey.getRecordKey());
- assertEquals(partitionPath, hoodieKey.getPartitionPath());
- }
-
- @Test
- public void testMultipleValueKeyGeneratorNonPartitioned() {
- TypedProperties properties = new TypedProperties();
- properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp");
- properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
- ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
- assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
- assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
- String rowKey =
- "_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
- + "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
- String partitionPath = "";
- HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
- assertEquals(rowKey, hoodieKey.getRecordKey());
- assertEquals(partitionPath, hoodieKey.getPartitionPath());
- }
-
-}
\ No newline at end of file
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
deleted file mode 100644
index 4c5ded3..0000000
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
-
- private TypedProperties getCommonProps(boolean getComplexRecordKey) {
- TypedProperties properties = new TypedProperties();
- if (getComplexRecordKey) {
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col");
- } else {
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
- }
- properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
- return properties;
- }
-
- private TypedProperties getPropertiesWithoutPartitionPathProp() {
- return getCommonProps(false);
- }
-
- private TypedProperties getPropertiesWithoutRecordKeyProp() {
- TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
- return properties;
- }
-
- private TypedProperties getWrongRecordKeyFieldProps() {
- TypedProperties properties = new TypedProperties();
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
- properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
- return properties;
- }
-
- private TypedProperties getProps() {
- TypedProperties properties = getCommonProps(true);
- properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms");
- return properties;
- }
-
- @Test
- public void testNullPartitionPathFields() {
- Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
- }
-
- @Test
- public void testNullRecordKeyFields() {
- Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
- }
-
- @Test
- public void testWrongRecordKeyField() {
- ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
- Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
- Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
- }
-
- @Test
- public void testHappyFlow() {
- ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
- GenericRecord record = getRecord();
- HoodieKey key = keyGenerator.getKey(record);
- Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
- Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21");
- Row row = KeyGeneratorTestUtilities.getRow(record);
- Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
- Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
- }
-}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
index 902359d..e299445 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
@@ -22,7 +22,6 @@ import java.time.LocalDate
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
-import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{FunSuite, Matchers}
@@ -43,7 +42,7 @@ class TestAvroConversionHelper extends FunSuite with Matchers {
test("Logical type: date") {
val schema = new Schema.Parser().parse(dateSchema)
- val convertor = AvroConversionHelper.createConverterToRow(schema, convertAvroSchemaToStructType(schema))
+ val convertor = AvroConversionHelper.createConverterToRow(schema, AvroConversionUtils.convertAvroSchemaToStructType(schema))
val dateOutputData = dateInputData.map(x => {
val record = new GenericData.Record(schema) {{ put("date", x) }}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 73e1f5d..99e1297 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -17,8 +17,6 @@
package org.apache.hudi
-import java.util
-
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TypedProperties
@@ -236,14 +234,29 @@ class TestDataSourceDefaults {
assertEquals("name1", keyGen.getPartitionPath(baseRow))
}
- class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) {
+ class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) with SparkKeyGeneratorInterface {
val recordKeyProp: String = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
val partitionPathProp: String = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY)
+ val STRUCT_NAME: String = "hoodieRowTopLevelField"
+ val NAMESPACE: String = "hoodieRow"
+ var converterFn: Function1[Any, Any] = _
override def getKey(record: GenericRecord): HoodieKey = {
new HoodieKey(HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyProp, true),
HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathProp, true))
}
+
+ override def getRecordKey(row: Row): String = {
+ if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
+ val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
+ getKey(genericRecord).getRecordKey
+ }
+
+ override def getPartitionPath(row: Row): String = {
+ if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
+ val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
+ getKey(genericRecord).getPartitionPath
+ }
}
@Test def testComplexKeyGenerator() = {