You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/01/24 14:35:07 UTC
[hudi] branch master updated: [HUDI-5575] Adding/Fixing auto generation of record keys w/ hudi (#7726)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2fc20c186b7 [HUDI-5575] Adding/Fixing auto generation of record keys w/ hudi (#7726)
2fc20c186b7 is described below
commit 2fc20c186b77017f7f1c6f6abe8559a9e8cfe578
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue Jan 24 06:34:55 2023 -0800
[HUDI-5575] Adding/Fixing auto generation of record keys w/ hudi (#7726)
* Adding auto generation of record keys w/ hudi. Added support to all key gen classes
* addressing feedback
* Fix tests
Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 9 ++
.../hudi/keygen/ComplexAvroKeyGenerator.java | 19 +++-
.../apache/hudi/keygen/CustomAvroKeyGenerator.java | 16 +--
.../hudi/keygen/GlobalAvroDeleteKeyGenerator.java | 10 +-
.../java/org/apache/hudi/keygen/KeyGenUtils.java | 6 +-
.../keygen/NonpartitionedAvroKeyGenerator.java | 17 +--
.../apache/hudi/keygen/SimpleAvroKeyGenerator.java | 13 ++-
...erator.java => TestAutoRecordKeyGenerator.java} | 53 +++++----
.../apache/hudi/keygen/ComplexKeyGenerator.java | 27 +++--
.../org/apache/hudi/keygen/CustomKeyGenerator.java | 21 ++--
.../hudi/keygen/GlobalDeleteKeyGenerator.java | 20 +++-
.../hudi/keygen/NonpartitionedKeyGenerator.java | 21 ++--
.../org/apache/hudi/keygen/SimpleKeyGenerator.java | 50 +++++----
.../hudi/keygen/TimestampBasedKeyGenerator.java | 18 +++-
.../org/apache/hudi/util/SparkKeyGenUtils.scala | 2 +-
.../apache/hudi/keygen/AutoRecordKeyGenerator.java | 43 ++++----
.../org/apache/hudi/keygen/BaseKeyGenerator.java | 13 +++
.../hudi/keygen/constant/KeyGeneratorOptions.java | 18 +++-
.../hudi/keygen/TestComplexKeyGenerator.java | 3 +-
.../apache/hudi/keygen/TestSimpleKeyGenerator.java | 3 +-
.../apache/hudi/TestAutoRecordKeyGeneration.scala | 119 +++++++++++++++++++++
.../org/apache/hudi/TestDataSourceDefaults.scala | 2 +-
.../deltastreamer/TestHoodieDeltaStreamer.java | 27 +++++
23 files changed, 404 insertions(+), 126 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d114cc3d778..8213f020f2a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2259,6 +2259,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
}
+ public Boolean doAutoGenerateRecordKeys() {
+ return getBooleanOrDefault(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS);
+ }
+
public boolean isEarlyConflictDetectionEnable() {
return getBoolean(EARLY_CONFLICT_DETECTION_ENABLE);
}
@@ -2805,6 +2809,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) {
+ writeConfig.setValue(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS, String.valueOf(autoGenerateRecordKeys));
+ return this;
+ }
+
public Builder withEarlyConflictDetectionEnable(boolean enable) {
writeConfig.setValue(EARLY_CONFLICT_DETECTION_ENABLE, String.valueOf(enable));
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
index 9ff5c522e45..91449a07b70 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -17,13 +17,17 @@
package org.apache.hudi.keygen;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.avro.generic.GenericRecord;
+
import java.util.Arrays;
+import java.util.Collections;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
/**
* Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
@@ -32,19 +36,26 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
public ComplexAvroKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+ this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+ ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
+ .collect(Collectors.toList()) : Collections.EMPTY_LIST;
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
+ instantiateAutoRecordKeyGenerator();
}
@Override
public String getRecordKey(GenericRecord record) {
- return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+ if (autoGenerateRecordKeys) {
+ return autoRecordKeyGenerator.getRecordKey(record);
+ } else {
+ checkArgument(getRecordKeyFieldNames().size() > 0, "Record key fields cannot be empty");
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+ }
}
@Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 77377de7ab8..2c829f6bb98 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -43,9 +43,6 @@ import java.util.stream.Collectors;
*/
public class CustomAvroKeyGenerator extends BaseKeyGenerator {
- private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
- public static final String SPLIT_REGEX = ":";
-
/**
* Used as a part of config in CustomKeyGenerator.java.
*/
@@ -57,6 +54,7 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
+ instantiateAutoRecordKeyGenerator();
}
@Override
@@ -102,10 +100,14 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator {
@Override
public String getRecordKey(GenericRecord record) {
- validateRecordKeyFields();
- return getRecordKeyFieldNames().size() == 1
- ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
- : new ComplexAvroKeyGenerator(config).getRecordKey(record);
+ if (autoGenerateRecordKeys) {
+ return autoRecordKeyGenerator.getRecordKey(record);
+ } else {
+ validateRecordKeyFields();
+ return getRecordKeyFieldNames().size() == 1
+ ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
+ : new ComplexAvroKeyGenerator(config).getRecordKey(record);
+ }
}
private void validateRecordKeyFields() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
index dc0bc3cef2f..193259ab7e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -36,11 +36,17 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
super(config);
this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+ this.partitionPathFields = new ArrayList<>();
+ instantiateAutoRecordKeyGenerator();
}
@Override
public String getRecordKey(GenericRecord record) {
- return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+ if (autoGenerateRecordKeys) {
+ return autoRecordKeyGenerator.getRecordKey(record);
+ } else {
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+ }
}
@Override
@@ -50,7 +56,7 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
@Override
public List<String> getPartitionPathFields() {
- return new ArrayList<>();
+ return partitionPathFields;
}
public String getEmptyPartition() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 5a8b2b01c5f..8c536de718d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -108,10 +108,12 @@ public class KeyGenUtils {
keyIsNullEmpty = false;
}
}
- recordKey.deleteCharAt(recordKey.length() - 1);
+ if (recordKey.length() > 0) {
+ recordKey.deleteCharAt(recordKey.length() - 1);
+ }
if (keyIsNullEmpty) {
throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: "
- + recordKeyFields.toString() + " cannot be entirely null or empty.");
+ + recordKeyFields + " cannot be entirely null or empty.");
}
return recordKey.toString();
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index 5b5cedcbf88..41426140ab5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -39,6 +39,7 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST;
+ instantiateAutoRecordKeyGenerator();
}
@Override
@@ -53,13 +54,17 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
@Override
public String getRecordKey(GenericRecord record) {
- // for backward compatibility, we need to use the right format according to the number of record key fields
- // 1. if there is only one record key field, the format of record key is just "<value>"
- // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
- if (getRecordKeyFieldNames().size() == 1) {
- return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+ if (autoGenerateRecordKeys) {
+ return autoRecordKeyGenerator.getRecordKey(record);
+ } else {
+ // for backward compatibility, we need to use the right format according to the number of record key fields
+ // 1. if there is only one record key field, the format of record key is just "<value>"
+ // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
+ if (getRecordKeyFieldNames().size() == 1) {
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+ }
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}
- return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
}
public String getEmptyPartition() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
index c7398e94ece..38123e39241 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -17,12 +17,15 @@
package org.apache.hudi.keygen;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.avro.generic.GenericRecord;
+
import java.util.Collections;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
/**
* Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
@@ -43,11 +46,17 @@ public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
? Collections.emptyList()
: Collections.singletonList(recordKeyField);
this.partitionPathFields = Collections.singletonList(partitionPathField);
+ instantiateAutoRecordKeyGenerator();
}
@Override
public String getRecordKey(GenericRecord record) {
- return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+ if (autoGenerateRecordKeys) {
+ return autoRecordKeyGenerator.getRecordKey(record);
+ } else {
+ checkArgument(getRecordKeyFieldNames().size() == 1, "Only 1 record key field allowed for SimpleKeyGenerator");
+ return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+ }
}
@Override
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java
similarity index 56%
rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
rename to hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java
index af6b30e3f09..6dc3c8483fb 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java
@@ -29,13 +29,16 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
-public class TestKeylessKeyGenerator {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestAutoRecordKeyGenerator {
private static final long TIME = 1672265446090L;
private static final Schema SCHEMA;
+ private static final String PARTITION_PATH_STR = "partition1";
static {
try {
- SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
+ SCHEMA = new Schema.Parser().parse(TestAutoRecordKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
@@ -43,52 +46,59 @@ public class TestKeylessKeyGenerator {
@Test
public void createKeyWithoutPartitionColumn() {
- KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
- GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+ GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
String actualForRecord = keyGenerator.getRecordKey(record);
- Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
+ assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
+ assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
}
@Test
public void createKeyWithPartition() {
- KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3));
- GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("integer_field,partition_field,nested_struct.doubly_nested", 3));
+ GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
String actualForRecord = keyGenerator.getRecordKey(record);
- Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
+ assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
+ assertEquals("123/partition1/__HIVE_DEFAULT_PARTITION__", keyGenerator.getPartitionPath(record));
}
@Test
public void nullFieldsProperlyHandled() {
- KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
- GenericRecord record = createRecord("partition1", "value1", null, null, null, null);
+ ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+ GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", null, null, null, null);
String actualForRecord = keyGenerator.getRecordKey(record);
- Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord);
+ assertEquals("a107710e-4d3b-33a4-bbbf-d891c7147034", actualForRecord);
+ assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
}
@Test
public void assertOnlySubsetOfFieldsUsed() {
- KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
- GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+ GenericRecord record1 = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
String actualForRecord1 = keyGenerator.getRecordKey(record1);
GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null);
String actualForRecord2 = keyGenerator.getRecordKey(record2);
- Assertions.assertEquals(actualForRecord2, actualForRecord1);
+ assertEquals(actualForRecord2, actualForRecord1);
+ assertEquals("partition2", keyGenerator.getPartitionPath(record2));
}
@Test
public void numFieldsImpactsKeyGen() {
- KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3));
- KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10));
- GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+ ComplexAvroKeyGenerator keyGenerator1 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+ ComplexAvroKeyGenerator keyGenerator2 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10));
+ GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record));
+ assertEquals(PARTITION_PATH_STR, keyGenerator1.getPartitionPath(record));
+ assertEquals(PARTITION_PATH_STR, keyGenerator2.getPartitionPath(record));
}
@Test
public void nestedColumnsUsed() {
- KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10));
+ ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1);
String actualForRecord = keyGenerator.getRecordKey(record);
- Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord);
+ assertEquals("569de5d6-55b8-38bf-9256-efc0f6e2ae84", actualForRecord);
+ assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
}
protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) {
@@ -112,8 +122,9 @@ public class TestKeylessKeyGenerator {
protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) {
TypedProperties properties = new TypedProperties();
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField);
- properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen);
+ properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(), numFieldsInKeyGen);
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "");
+ properties.put(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),"true");
return properties;
}
-}
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index c9cff284e80..9e8550643f9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.util.Arrays;
+import java.util.Collections;
import java.util.stream.Collectors;
/**
@@ -41,14 +42,16 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
public ComplexKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
+ this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+ ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
.map(String::trim)
.filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
- this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
+ .collect(Collectors.toList()) : Collections.EMPTY_LIST;
+ this.partitionPathFields = props.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+ ? Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
.map(String::trim)
.filter(s -> !s.isEmpty())
- .collect(Collectors.toList());
+ .collect(Collectors.toList()) : Collections.EMPTY_LIST;
this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
}
@@ -64,14 +67,22 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
- tryInitRowAccessor(row.schema());
- return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(row);
+ } else {
+ tryInitRowAccessor(row.schema());
+ return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+ }
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
- tryInitRowAccessor(schema);
- return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(internalRow, schema);
+ } else {
+ tryInitRowAccessor(schema);
+ return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+ }
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index fcd94bb4f15..d65f8f8b5a5 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -60,14 +60,15 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
// NOTE: We have to strip partition-path configuration, since it could only be interpreted by
// this key-gen
super(stripPartitionPathConfig(props));
- this.recordKeyFields =
- Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+ String recordKeyField = props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+ this.recordKeyFields = recordKeyField == null ? Collections.emptyList() :
+ Arrays.stream(recordKeyField.split(","))
.map(String::trim)
.collect(Collectors.toList());
- String partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
- this.partitionPathFields = partitionPathFields == null
+ String partitionPathField = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+ this.partitionPathFields = partitionPathField == null
? Collections.emptyList()
- : Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
+ : Arrays.stream(partitionPathField.split(",")).map(String::trim).collect(Collectors.toList());
this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
validateRecordKeyFields();
@@ -85,9 +86,13 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
- return getRecordKeyFieldNames().size() == 1
- ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row)
- : new ComplexKeyGenerator(config).getRecordKey(row);
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(row);
+ } else {
+ return getRecordKeyFieldNames().size() == 1
+ ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row)
+ : new ComplexKeyGenerator(config).getRecordKey(row);
+ }
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 7fcc16094ea..a301b58c682 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -29,6 +29,7 @@ import org.apache.spark.unsafe.types.UTF8String;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
/**
@@ -40,7 +41,8 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator;
public GlobalDeleteKeyGenerator(TypedProperties config) {
super(config);
- this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+ this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+ ? Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) : Collections.emptyList();
this.globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
}
@@ -61,14 +63,22 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
- tryInitRowAccessor(row.schema());
- return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(row);
+ } else {
+ tryInitRowAccessor(row.schema());
+ return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+ }
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
- tryInitRowAccessor(schema);
- return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(internalRow, schema);
+ } else {
+ tryInitRowAccessor(schema);
+ return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+ }
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index 100bcc2cd7f..c179285a2a1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -40,10 +40,11 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
public NonpartitionedKeyGenerator(TypedProperties props) {
super(props);
- this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+ this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+ ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
.split(","))
.map(String::trim)
- .collect(Collectors.toList());
+ .collect(Collectors.toList()) : Collections.emptyList();
this.partitionPathFields = Collections.emptyList();
this.nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props);
}
@@ -60,14 +61,22 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
- tryInitRowAccessor(row.schema());
- return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(row);
+ } else {
+ tryInitRowAccessor(row.schema());
+ return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+ }
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
- tryInitRowAccessor(schema);
- return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(internalRow, schema);
+ } else {
+ tryInitRowAccessor(schema);
+ return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+ }
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index 4b1a5e5cb44..ea4e58cf769 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -18,9 +18,10 @@
package org.apache.hudi.keygen;
-import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
@@ -38,7 +39,7 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
public SimpleKeyGenerator(TypedProperties props) {
- this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+ this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),
props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
}
@@ -69,31 +70,40 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
@Override
public String getRecordKey(Row row) {
- tryInitRowAccessor(row.schema());
-
- Object[] recordKeys = rowAccessor.getRecordKeyParts(row);
- // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
- // record-key field
- if (recordKeys[0] == null) {
- return handleNullRecordKey(null);
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(row);
} else {
- return requireNonNullNonEmptyKey(recordKeys[0].toString());
+ tryInitRowAccessor(row.schema());
+
+ Object[] recordKeys = rowAccessor.getRecordKeyParts(row);
+ // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
+ // record-key field
+ checkArgument(recordKeys != null && recordKeys.length > 0, "Record keys cannot be null or empty");
+ if (recordKeys[0] == null) {
+ return handleNullRecordKey(null);
+ } else {
+ return requireNonNullNonEmptyKey(recordKeys[0].toString());
+ }
}
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
- tryInitRowAccessor(schema);
-
- Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow);
- // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
- // record-key field
- if (recordKeyValues[0] == null) {
- return handleNullRecordKey(null);
- } else if (recordKeyValues[0] instanceof UTF8String) {
- return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]);
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(internalRow, schema);
} else {
- return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString()));
+ tryInitRowAccessor(schema);
+
+ Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow);
+ // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
+ // record-key field
+ if (recordKeyValues[0] == null) {
+ return handleNullRecordKey(null);
+ } else if (recordKeyValues[0] instanceof UTF8String) {
+ return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]);
+ } else {
+ return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString()));
+ }
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index fa36f2152cb..1507cf85e14 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -41,7 +41,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
- this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+ this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
}
@@ -61,14 +61,22 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
@Override
public String getRecordKey(Row row) {
- tryInitRowAccessor(row.schema());
- return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(row);
+ } else {
+ tryInitRowAccessor(row.schema());
+ return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+ }
}
@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
- tryInitRowAccessor(schema);
- return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+ if (autoGenerateRecordKeys) {
+ return super.getRecordKey(internalRow, schema);
+ } else {
+ tryInitRowAccessor(schema);
+ return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+ }
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index 4cdbbf7577a..4e4c03402d1 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -48,7 +48,7 @@ object SparkKeyGenUtils {
case c: BaseKeyGenerator
if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
c.getPartitionPathFields.asScala.map(pathField =>
- pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
+ pathField.split(BaseKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
.mkString(",")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
similarity index 81%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
rename to hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
index d487e7e1ff9..787dedd1809 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
@@ -7,13 +7,14 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.keygen;
@@ -25,6 +26,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Arrays;
@@ -37,18 +39,10 @@ import java.util.UUID;
import java.util.stream.Collectors;
/**
- * This class is used to compute a deterministic key for a record based on the contents of the field. Unlike the other KeyGenerators in Hudi, this class does not take in any field names as args to
- * create a "keyless" experience for insert only workloads. The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled.
- * The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing likelihood for uniqueness. The ordering is:
- * <ul>
- * <li>timestamp</li>
- * <li>numeric values</li>
- * <li>string, byte arrays, other types not mentioned</li>
- * <li>date, lists, maps, booleans</li>
- * </ul>
- * The number of fields is capped to created predictable performance and the generator only uses non-null values to help increase uniqueness for sparse datasets.
+ * Class to assist in generation auto record keys for hudi records.
*/
-public class KeylessKeyGenerator extends CustomAvroKeyGenerator {
+public class AutoRecordKeyGenerator implements Serializable {
+
private static final String HOODIE_PREFIX = "_hoodie";
private static final String DOT = ".";
private final int maxFieldsToConsider;
@@ -56,15 +50,20 @@ public class KeylessKeyGenerator extends CustomAvroKeyGenerator {
private final Set<String> partitionFieldNames;
private int[][] fieldOrdering;
- public KeylessKeyGenerator(TypedProperties props) {
- super(props);
- this.numFieldsForKey = props.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue());
+ public AutoRecordKeyGenerator(TypedProperties config, List<String> partitionPathFields) {
+ this.numFieldsForKey = config.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(),
+ KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.defaultValue());
// cap the number of fields to order in case of large schemas
this.maxFieldsToConsider = numFieldsForKey * 3;
- this.partitionFieldNames = this.getPartitionPathFields().stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet());
+ this.partitionFieldNames = partitionPathFields.stream().map(field -> {
+ if (field.contains(BaseKeyGenerator.SPLIT_REGEX)) {
+ return field.split(BaseKeyGenerator.SPLIT_REGEX)[0];
+ } else {
+ return field;
+ }
+ }).collect(Collectors.toSet());
}
- @Override
public String getRecordKey(GenericRecord record) {
return buildKey(getFieldOrdering(record), record);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index d0baa903919..4a19dee9989 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -32,11 +32,16 @@ import java.util.List;
*/
public abstract class BaseKeyGenerator extends KeyGenerator {
+ protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+ public static final String SPLIT_REGEX = ":";
+
protected List<String> recordKeyFields;
protected List<String> partitionPathFields;
protected final boolean encodePartitionPath;
protected final boolean hiveStylePartitioning;
protected final boolean consistentLogicalTimestampEnabled;
+ protected final boolean autoGenerateRecordKeys;
+ protected AutoRecordKeyGenerator autoRecordKeyGenerator;
protected BaseKeyGenerator(TypedProperties config) {
super(config);
@@ -46,6 +51,8 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
Boolean.parseBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue()));
this.consistentLogicalTimestampEnabled = config.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ this.autoGenerateRecordKeys = config.getBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),
+ Boolean.parseBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue()));
}
/**
@@ -81,4 +88,10 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
public boolean isConsistentLogicalTimestampEnabled() {
return consistentLogicalTimestampEnabled;
}
+
+ protected void instantiateAutoRecordKeyGenerator() {
+ if (autoGenerateRecordKeys) {
+ autoRecordKeyGenerator = new AutoRecordKeyGenerator(config, getPartitionPathFields());
+ }
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
index b0a46ac0676..911fd66db46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
@@ -69,11 +69,21 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. "
+ "If enabled, then the timestamp value will be written in both the cases.");
- public static final ConfigProperty<Integer> NUM_FIELDS_IN_KEYLESS_GENERATOR = ConfigProperty
- .key("hoodie.datasource.write.recordkey.keyless.field.count")
+ public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = ConfigProperty
+ .key("hoodie.auto.generate.record.keys")
+ .defaultValue("false")
+ .sinceVersion("0.13.0")
+ .withDocumentation("When enabled, hudi will auto generate a deterministic key for a record based on the contents of the field. "
+ + "The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled."
+ + "The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing "
+ + "likelihood for uniqueness.");
+
+ public static final ConfigProperty<Integer> NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION = ConfigProperty
+ .key("hoodie.datasource.write.auto.recordkey.num.fields")
.defaultValue(5)
- .withDocumentation("When using the KeylessKeyGenerator, this sets the number of fields to use when computing the UUID for the record. "
- + "Increasing the value will increase the randomness of the generated key but can impact performance.");
+ .withDocumentation("When enabling auto generation of record keys(hoodie.auto.generate.record.keys) , this sets the number of fields "
+ + "to use when computing the UUID for the record. Increasing the value will increase the randomness of the generated key but can "
+ + "impact performance.");
/**
* @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods.
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
index caed61249a1..4cfa619d627 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -77,7 +77,8 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testNullRecordKeyFields() {
- Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp());
+ Assertions.assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord()));
}
@Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
index 77d1b34f136..5e7a9f0b8ea 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -96,7 +96,8 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
@Test
public void testNullRecordKeyFields() {
- assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+ SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp());
+ assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord()));
}
@Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala
new file mode 100644
index 00000000000..543b4f31c19
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConversions._
+
+/**
+ * Tests auto generation of record keys.
+ */
+class TestAutoRecordKeyGeneration extends SparkClientFunctionalTestHarness {
+
+ var commonOpts: Map[String, String] = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+ "hoodie.delete.shuffle.parallelism" -> "2",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ )
+
+ @ParameterizedTest
+ @CsvSource(value = Array(
+ "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator",
+ "COPY_ON_WRITE|org.apache.hudi.keygen.ComplexKeyGenerator",
+ "COPY_ON_WRITE|org.apache.hudi.keygen.TimestampBasedKeyGenerator",
+ "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator",
+ "MERGE_ON_READ|org.apache.hudi.keygen.ComplexKeyGenerator",
+ "MERGE_ON_READ|org.apache.hudi.keygen.TimestampBasedKeyGenerator"
+ ), delimiter = '|')
+ def testRecordKeyGeneration(tableType: String, keyGenClass: String): Unit = {
+ var options: Map[String, String] = commonOpts +
+ (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
+ (DataSourceWriteOptions.TABLE_TYPE.key() -> tableType) +
+ (KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() -> "true") +
+ (HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true")
+
+ if (keyGenClass == classOf[TimestampBasedKeyGenerator].getName) {
+ options ++= Map(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
+ KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd",
+ KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd")
+ }
+
+ val dataGen = new HoodieTestDataGenerator(0xDEED)
+ val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ // Bulk Insert Operation
+ val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
+ inputDf0.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ // Snapshot query
+ val snapshotDf0 = spark.read.format("org.apache.hudi")
+ .load(basePath)
+ assertEquals(100, snapshotDf0.count())
+
+ // Insert
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDf1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDf1.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "001"))
+
+ // Snapshot query
+ val snapshotDf1 = spark.read.format("org.apache.hudi")
+ .load(basePath)
+ assertEquals(200, snapshotDf1.count())
+
+ // even though we generate updates, since auto generation of record keys are enabled, it should result in new records
+ val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50)).toList
+ val updateDf = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ updateDf.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val snapshotDf2 = spark.read.format("hudi")
+ .load(basePath)
+ assertEquals(250, snapshotDf2.count())
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 1bb81f7f92e..12c1179fde2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -95,7 +95,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "partitionField")
assertThrows(classOf[IllegalArgumentException]) {
- new SimpleKeyGenerator(props)
+ new SimpleKeyGenerator(props).getRecordKey(baseRow)
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index c2073c703d9..e95d8bab344 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -62,6 +62,7 @@ import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
@@ -2421,6 +2422,32 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
}
+ @Test
+ public void testAutoGenerateRecordKeys() throws Exception {
+ boolean useSchemaProvider = false;
+ List<String> transformerClassNames = null;
+ PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+ int parquetRecordsCount = 100;
+ boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
+ prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
+ prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+ PARQUET_SOURCE_ROOT, false, "partition_path", "");
+
+ String tableBasePath = basePath + "/test_parquet_table" + testNum;
+ HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+ transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+ useSchemaProvider, 100000, false, null, null, "timestamp", null);
+ config.configs.add(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() + "=true");
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+
+ prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
+ deltaStreamer.sync();
+ TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
+ testNum++;
+ }
+
class TestDeltaSync extends DeltaSync {
public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,