You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2023/01/24 14:35:07 UTC

[hudi] branch master updated: [HUDI-5575] Adding/Fixing auto generation of record keys w/ hudi (#7726)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fc20c186b7 [HUDI-5575] Adding/Fixing auto generation of record keys w/ hudi (#7726)
2fc20c186b7 is described below

commit 2fc20c186b77017f7f1c6f6abe8559a9e8cfe578
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue Jan 24 06:34:55 2023 -0800

    [HUDI-5575] Adding/Fixing auto generation of record keys w/ hudi (#7726)
    
    * Adding auto generation of record keys w/ hudi. Added support to all key gen classes
    
    * addressing feedback
    
    * Fix tests
    
    Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   9 ++
 .../hudi/keygen/ComplexAvroKeyGenerator.java       |  19 +++-
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java |  16 +--
 .../hudi/keygen/GlobalAvroDeleteKeyGenerator.java  |  10 +-
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |   6 +-
 .../keygen/NonpartitionedAvroKeyGenerator.java     |  17 +--
 .../apache/hudi/keygen/SimpleAvroKeyGenerator.java |  13 ++-
 ...erator.java => TestAutoRecordKeyGenerator.java} |  53 +++++----
 .../apache/hudi/keygen/ComplexKeyGenerator.java    |  27 +++--
 .../org/apache/hudi/keygen/CustomKeyGenerator.java |  21 ++--
 .../hudi/keygen/GlobalDeleteKeyGenerator.java      |  20 +++-
 .../hudi/keygen/NonpartitionedKeyGenerator.java    |  21 ++--
 .../org/apache/hudi/keygen/SimpleKeyGenerator.java |  50 +++++----
 .../hudi/keygen/TimestampBasedKeyGenerator.java    |  18 +++-
 .../org/apache/hudi/util/SparkKeyGenUtils.scala    |   2 +-
 .../apache/hudi/keygen/AutoRecordKeyGenerator.java |  43 ++++----
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |  13 +++
 .../hudi/keygen/constant/KeyGeneratorOptions.java  |  18 +++-
 .../hudi/keygen/TestComplexKeyGenerator.java       |   3 +-
 .../apache/hudi/keygen/TestSimpleKeyGenerator.java |   3 +-
 .../apache/hudi/TestAutoRecordKeyGeneration.scala  | 119 +++++++++++++++++++++
 .../org/apache/hudi/TestDataSourceDefaults.scala   |   2 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  27 +++++
 23 files changed, 404 insertions(+), 126 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d114cc3d778..8213f020f2a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2259,6 +2259,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
   }
 
+  public Boolean doAutoGenerateRecordKeys() {
+    return getBooleanOrDefault(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS);
+  }
+
   public boolean isEarlyConflictDetectionEnable() {
     return getBoolean(EARLY_CONFLICT_DETECTION_ENABLE);
   }
@@ -2805,6 +2809,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) {
+      writeConfig.setValue(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS, String.valueOf(autoGenerateRecordKeys));
+      return this;
+    }
+
     public Builder withEarlyConflictDetectionEnable(boolean enable) {
       writeConfig.setValue(EARLY_CONFLICT_DETECTION_ENABLE, String.valueOf(enable));
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
index 9ff5c522e45..91449a07b70 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -17,13 +17,17 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
+import org.apache.avro.generic.GenericRecord;
+
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
 /**
  * Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
  */
@@ -32,19 +36,26 @@ public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+    this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+        ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
-        .collect(Collectors.toList());
+        .collect(Collectors.toList()) : Collections.EMPTY_LIST;
     this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
         .collect(Collectors.toList());
+    instantiateAutoRecordKeyGenerator();
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+    if (autoGenerateRecordKeys) {
+      return autoRecordKeyGenerator.getRecordKey(record);
+    } else {
+      checkArgument(getRecordKeyFieldNames().size() > 0, "Record key fields cannot be empty");
+      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 77377de7ab8..2c829f6bb98 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -43,9 +43,6 @@ import java.util.stream.Collectors;
  */
 public class CustomAvroKeyGenerator extends BaseKeyGenerator {
 
-  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
-  public static final String SPLIT_REGEX = ":";
-
   /**
    * Used as a part of config in CustomKeyGenerator.java.
    */
@@ -57,6 +54,7 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator {
     super(props);
     this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
     this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(",")).map(String::trim).collect(Collectors.toList());
+    instantiateAutoRecordKeyGenerator();
   }
 
   @Override
@@ -102,10 +100,14 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    validateRecordKeyFields();
-    return getRecordKeyFieldNames().size() == 1
-        ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
-        : new ComplexAvroKeyGenerator(config).getRecordKey(record);
+    if (autoGenerateRecordKeys) {
+      return autoRecordKeyGenerator.getRecordKey(record);
+    } else {
+      validateRecordKeyFields();
+      return getRecordKeyFieldNames().size() == 1
+          ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
+          : new ComplexAvroKeyGenerator(config).getRecordKey(record);
+    }
   }
 
   private void validateRecordKeyFields() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
index dc0bc3cef2f..193259ab7e0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -36,11 +36,17 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
   public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
     super(config);
     this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+    this.partitionPathFields = new ArrayList<>();
+    instantiateAutoRecordKeyGenerator();
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+    if (autoGenerateRecordKeys) {
+      return autoRecordKeyGenerator.getRecordKey(record);
+    } else {
+      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+    }
   }
 
   @Override
@@ -50,7 +56,7 @@ public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
 
   @Override
   public List<String> getPartitionPathFields() {
-    return new ArrayList<>();
+    return partitionPathFields;
   }
 
   public String getEmptyPartition() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 5a8b2b01c5f..8c536de718d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -108,10 +108,12 @@ public class KeyGenUtils {
         keyIsNullEmpty = false;
       }
     }
-    recordKey.deleteCharAt(recordKey.length() - 1);
+    if (recordKey.length() > 0) {
+      recordKey.deleteCharAt(recordKey.length() - 1);
+    }
     if (keyIsNullEmpty) {
       throw new HoodieKeyException("recordKey values: \"" + recordKey + "\" for fields: "
-          + recordKeyFields.toString() + " cannot be entirely null or empty.");
+          + recordKeyFields + " cannot be entirely null or empty.");
     }
     return recordKey.toString();
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index 5b5cedcbf88..41426140ab5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -39,6 +39,7 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
     this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
         .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
     this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST;
+    instantiateAutoRecordKeyGenerator();
   }
 
   @Override
@@ -53,13 +54,17 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    // for backward compatibility, we need to use the right format according to the number of record key fields
-    // 1. if there is only one record key field, the format of record key is just "<value>"
-    // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
-    if (getRecordKeyFieldNames().size() == 1) {
-      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+    if (autoGenerateRecordKeys) {
+      return autoRecordKeyGenerator.getRecordKey(record);
+    } else {
+      // for backward compatibility, we need to use the right format according to the number of record key fields
+      // 1. if there is only one record key field, the format of record key is just "<value>"
+      // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
+      if (getRecordKeyFieldNames().size() == 1) {
+        return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+      }
+      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
     }
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
   }
 
   public String getEmptyPartition() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
index c7398e94ece..38123e39241 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -17,12 +17,15 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
+import org.apache.avro.generic.GenericRecord;
+
 import java.util.Collections;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
 /**
  * Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
  */
@@ -43,11 +46,17 @@ public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
         ? Collections.emptyList()
         : Collections.singletonList(recordKeyField);
     this.partitionPathFields = Collections.singletonList(partitionPathField);
+    instantiateAutoRecordKeyGenerator();
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+    if (autoGenerateRecordKeys) {
+      return autoRecordKeyGenerator.getRecordKey(record);
+    } else {
+      checkArgument(getRecordKeyFieldNames().size() == 1, "Only 1 record key field allowed for SimpleKeyGenerator");
+      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java
similarity index 56%
rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
rename to hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java
index af6b30e3f09..6dc3c8483fb 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeylessKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestAutoRecordKeyGenerator.java
@@ -29,13 +29,16 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-public class TestKeylessKeyGenerator {
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestAutoRecordKeyGenerator {
   private static final long TIME = 1672265446090L;
   private static final Schema SCHEMA;
+  private static final String PARTITION_PATH_STR = "partition1";
 
   static {
     try {
-      SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
+      SCHEMA = new Schema.Parser().parse(TestAutoRecordKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
     } catch (IOException ex) {
       throw new RuntimeException(ex);
     }
@@ -43,52 +46,59 @@ public class TestKeylessKeyGenerator {
 
   @Test
   public void createKeyWithoutPartitionColumn() {
-    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
-    GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+    GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
     String actualForRecord = keyGenerator.getRecordKey(record);
-    Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
+    assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
+    assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
   }
 
   @Test
   public void createKeyWithPartition() {
-    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3));
-    GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("integer_field,partition_field,nested_struct.doubly_nested", 3));
+    GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
     String actualForRecord = keyGenerator.getRecordKey(record);
-    Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
+    assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
+    assertEquals("123/partition1/__HIVE_DEFAULT_PARTITION__", keyGenerator.getPartitionPath(record));
   }
 
   @Test
   public void nullFieldsProperlyHandled() {
-    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
-    GenericRecord record = createRecord("partition1", "value1", null, null, null, null);
+    ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+    GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", null, null, null, null);
     String actualForRecord = keyGenerator.getRecordKey(record);
-    Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord);
+    assertEquals("a107710e-4d3b-33a4-bbbf-d891c7147034", actualForRecord);
+    assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
   }
 
   @Test
   public void assertOnlySubsetOfFieldsUsed() {
-    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
-    GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    ComplexAvroKeyGenerator keyGenerator = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+    GenericRecord record1 = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
     String actualForRecord1 = keyGenerator.getRecordKey(record1);
     GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null);
     String actualForRecord2 = keyGenerator.getRecordKey(record2);
-    Assertions.assertEquals(actualForRecord2, actualForRecord1);
+    assertEquals(actualForRecord2, actualForRecord1);
+    assertEquals("partition2", keyGenerator.getPartitionPath(record2));
   }
 
   @Test
   public void numFieldsImpactsKeyGen() {
-    KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3));
-    KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10));
-    GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
+    ComplexAvroKeyGenerator keyGenerator1 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 3));
+    ComplexAvroKeyGenerator keyGenerator2 = new ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10));
+    GenericRecord record = createRecord(PARTITION_PATH_STR, "value1", 123, 456L, TIME, null);
     Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record));
+    assertEquals(PARTITION_PATH_STR, keyGenerator1.getPartitionPath(record));
+    assertEquals(PARTITION_PATH_STR, keyGenerator2.getPartitionPath(record));
   }
 
   @Test
   public void nestedColumnsUsed() {
-    KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10));
+    ComplexAvroKeyGenerator keyGenerator = new  ComplexAvroKeyGenerator(getKeyGenProperties("partition_field", 10));
     GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1);
     String actualForRecord = keyGenerator.getRecordKey(record);
-    Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord);
+    assertEquals("569de5d6-55b8-38bf-9256-efc0f6e2ae84", actualForRecord);
+    assertEquals(PARTITION_PATH_STR, keyGenerator.getPartitionPath(record));
   }
 
   protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) {
@@ -112,8 +122,9 @@ public class TestKeylessKeyGenerator {
   protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) {
     TypedProperties properties = new TypedProperties();
     properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField);
-    properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen);
+    properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(), numFieldsInKeyGen);
     properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "");
+    properties.put(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),"true");
     return properties;
   }
-}
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index c9cff284e80..9e8550643f9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -26,6 +26,7 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.Collectors;
 
 /**
@@ -41,14 +42,16 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
 
   public ComplexKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
+    this.recordKeyFields = props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+        ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(FIELDS_SEP))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
-        .collect(Collectors.toList());
-    this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
+        .collect(Collectors.toList()) : Collections.EMPTY_LIST;
+    this.partitionPathFields = props.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+        ? Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(FIELDS_SEP))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
-        .collect(Collectors.toList());
+        .collect(Collectors.toList()) : Collections.EMPTY_LIST;
     this.complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
   }
 
@@ -64,14 +67,22 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    tryInitRowAccessor(row.schema());
-    return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(row);
+    } else {
+      tryInitRowAccessor(row.schema());
+      return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+    }
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
-    tryInitRowAccessor(schema);
-    return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(internalRow, schema);
+    } else {
+      tryInitRowAccessor(schema);
+      return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index fcd94bb4f15..d65f8f8b5a5 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -60,14 +60,15 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
     // NOTE: We have to strip partition-path configuration, since it could only be interpreted by
     //       this key-gen
     super(stripPartitionPathConfig(props));
-    this.recordKeyFields =
-        Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
+    String recordKeyField = props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+    this.recordKeyFields = recordKeyField == null ? Collections.emptyList() :
+        Arrays.stream(recordKeyField.split(","))
             .map(String::trim)
             .collect(Collectors.toList());
-    String partitionPathFields = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
-    this.partitionPathFields = partitionPathFields == null
+    String partitionPathField = props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
+    this.partitionPathFields = partitionPathField == null
         ? Collections.emptyList()
-        : Arrays.stream(partitionPathFields.split(",")).map(String::trim).collect(Collectors.toList());
+        : Arrays.stream(partitionPathField.split(",")).map(String::trim).collect(Collectors.toList());
     this.customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
 
     validateRecordKeyFields();
@@ -85,9 +86,13 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    return getRecordKeyFieldNames().size() == 1
-        ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row)
-        : new ComplexKeyGenerator(config).getRecordKey(row);
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(row);
+    } else {
+      return getRecordKeyFieldNames().size() == 1
+          ? new SimpleKeyGenerator(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()), null).getRecordKey(row)
+          : new ComplexKeyGenerator(config).getRecordKey(row);
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 7fcc16094ea..a301b58c682 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -29,6 +29,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -40,7 +41,8 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
   private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator;
   public GlobalDeleteKeyGenerator(TypedProperties config) {
     super(config);
-    this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+    this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+        ? Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(",")) : Collections.emptyList();
     this.globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
   }
 
@@ -61,14 +63,22 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    tryInitRowAccessor(row.schema());
-    return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(row);
+    } else {
+      tryInitRowAccessor(row.schema());
+      return combineCompositeRecordKey(rowAccessor.getRecordKeyParts(row));
+    }
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
-    tryInitRowAccessor(schema);
-    return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(internalRow, schema);
+    } else {
+      tryInitRowAccessor(schema);
+      return combineCompositeRecordKeyUnsafe(rowAccessor.getRecordKeyParts(internalRow));
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index 100bcc2cd7f..c179285a2a1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -40,10 +40,11 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
 
   public NonpartitionedKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+    this.recordKeyFields = config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+        ? Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
         .split(","))
         .map(String::trim)
-        .collect(Collectors.toList());
+        .collect(Collectors.toList()) : Collections.emptyList();
     this.partitionPathFields = Collections.emptyList();
     this.nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(props);
   }
@@ -60,14 +61,22 @@ public class NonpartitionedKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    tryInitRowAccessor(row.schema());
-    return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(row);
+    } else {
+      tryInitRowAccessor(row.schema());
+      return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+    }
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
-    tryInitRowAccessor(schema);
-    return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(internalRow, schema);
+    } else {
+      tryInitRowAccessor(schema);
+      return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index 4b1a5e5cb44..ea4e58cf769 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -18,9 +18,10 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
@@ -38,7 +39,7 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
   private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
 
   public SimpleKeyGenerator(TypedProperties props) {
-    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
@@ -69,31 +70,40 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    tryInitRowAccessor(row.schema());
-
-    Object[] recordKeys = rowAccessor.getRecordKeyParts(row);
-    // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
-    //       record-key field
-    if (recordKeys[0] == null) {
-      return handleNullRecordKey(null);
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(row);
     } else {
-      return requireNonNullNonEmptyKey(recordKeys[0].toString());
+      tryInitRowAccessor(row.schema());
+
+      Object[] recordKeys = rowAccessor.getRecordKeyParts(row);
+      // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
+      //       record-key field
+      checkArgument(recordKeys != null && recordKeys.length > 0, "Record keys cannot be null or empty");
+      if (recordKeys[0] == null) {
+        return handleNullRecordKey(null);
+      } else {
+        return requireNonNullNonEmptyKey(recordKeys[0].toString());
+      }
     }
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
-    tryInitRowAccessor(schema);
-
-    Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow);
-    // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
-    //       record-key field
-    if (recordKeyValues[0] == null) {
-      return handleNullRecordKey(null);
-    } else if (recordKeyValues[0] instanceof UTF8String) {
-      return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]);
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(internalRow, schema);
     } else {
-      return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString()));
+      tryInitRowAccessor(schema);
+
+      Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow);
+      // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite)
+      //       record-key field
+      if (recordKeyValues[0] == null) {
+        return handleNullRecordKey(null);
+      } else if (recordKeyValues[0] instanceof UTF8String) {
+        return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]);
+      } else {
+        return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString()));
+      }
     }
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
index fa36f2152cb..1507cf85e14 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -41,7 +41,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
   private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
 
   public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
-    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null),
         config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
@@ -61,14 +61,22 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
 
   @Override
   public String getRecordKey(Row row) {
-    tryInitRowAccessor(row.schema());
-    return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(row);
+    } else {
+      tryInitRowAccessor(row.schema());
+      return combineRecordKey(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(row)));
+    }
   }
 
   @Override
   public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
-    tryInitRowAccessor(schema);
-    return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+    if (autoGenerateRecordKeys) {
+      return super.getRecordKey(internalRow, schema);
+    } else {
+      tryInitRowAccessor(schema);
+      return combineRecordKeyUnsafe(getRecordKeyFieldNames(), Arrays.asList(rowAccessor.getRecordKeyParts(internalRow)));
+    }
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
index 4cdbbf7577a..4e4c03402d1 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/SparkKeyGenUtils.scala
@@ -48,7 +48,7 @@ object SparkKeyGenUtils {
       case c: BaseKeyGenerator
         if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] =>
         c.getPartitionPathFields.asScala.map(pathField =>
-          pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
+          pathField.split(BaseKeyGenerator.SPLIT_REGEX)
             .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}"))
           .mkString(",")
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
similarity index 81%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
rename to hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
index d487e7e1ff9..787dedd1809 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeylessKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
@@ -7,13 +7,14 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.keygen;
@@ -25,6 +26,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
+import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayDeque;
 import java.util.Arrays;
@@ -37,18 +39,10 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
- * This class is used to compute a deterministic key for a record based on the contents of the field. Unlike the other KeyGenerators in Hudi, this class does not take in any field names as args to
- * create a "keyless" experience for insert only workloads. The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled.
- * The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing likelihood for uniqueness. The ordering is:
- * <ul>
- *   <li>timestamp</li>
- *   <li>numeric values</li>
- *   <li>string, byte arrays, other types not mentioned</li>
- *   <li>date, lists, maps, booleans</li>
- * </ul>
- * The number of fields is capped to created predictable performance and the generator only uses non-null values to help increase uniqueness for sparse datasets.
+ * Class to assist in generation auto record keys for hudi records.
  */
-public class KeylessKeyGenerator extends CustomAvroKeyGenerator {
+public class AutoRecordKeyGenerator implements Serializable {
+
   private static final String HOODIE_PREFIX = "_hoodie";
   private static final String DOT = ".";
   private final int maxFieldsToConsider;
@@ -56,15 +50,20 @@ public class KeylessKeyGenerator extends CustomAvroKeyGenerator {
   private final Set<String> partitionFieldNames;
   private int[][] fieldOrdering;
 
-  public KeylessKeyGenerator(TypedProperties props) {
-    super(props);
-    this.numFieldsForKey = props.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue());
+  public AutoRecordKeyGenerator(TypedProperties config, List<String> partitionPathFields) {
+    this.numFieldsForKey = config.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.key(),
+        KeyGeneratorOptions.NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION.defaultValue());
     // cap the number of fields to order in case of large schemas
     this.maxFieldsToConsider = numFieldsForKey * 3;
-    this.partitionFieldNames = this.getPartitionPathFields().stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet());
+    this.partitionFieldNames = partitionPathFields.stream().map(field ->  {
+      if (field.contains(BaseKeyGenerator.SPLIT_REGEX)) {
+        return field.split(BaseKeyGenerator.SPLIT_REGEX)[0];
+      } else {
+        return field;
+      }
+    }).collect(Collectors.toSet());
   }
 
-  @Override
   public String getRecordKey(GenericRecord record) {
     return buildKey(getFieldOrdering(record), record);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index d0baa903919..4a19dee9989 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -32,11 +32,16 @@ import java.util.List;
  */
 public abstract class BaseKeyGenerator extends KeyGenerator {
 
+  protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+  public static final String SPLIT_REGEX = ":";
+
   protected List<String> recordKeyFields;
   protected List<String> partitionPathFields;
   protected final boolean encodePartitionPath;
   protected final boolean hiveStylePartitioning;
   protected final boolean consistentLogicalTimestampEnabled;
+  protected final boolean autoGenerateRecordKeys;
+  protected AutoRecordKeyGenerator autoRecordKeyGenerator;
 
   protected BaseKeyGenerator(TypedProperties config) {
     super(config);
@@ -46,6 +51,8 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
         Boolean.parseBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.defaultValue()));
     this.consistentLogicalTimestampEnabled = config.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
         Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+    this.autoGenerateRecordKeys = config.getBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),
+        Boolean.parseBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue()));
   }
 
   /**
@@ -81,4 +88,10 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
   public boolean isConsistentLogicalTimestampEnabled() {
     return consistentLogicalTimestampEnabled;
   }
+
+  protected void instantiateAutoRecordKeyGenerator() {
+    if (autoGenerateRecordKeys) {
+      autoRecordKeyGenerator = new AutoRecordKeyGenerator(config, getPartitionPathFields());
+    }
+  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
index b0a46ac0676..911fd66db46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
@@ -69,11 +69,21 @@ public class KeyGeneratorOptions extends HoodieConfig {
           + "`2016-12-29 09:54:00.0` in row-writer path, while it will be written as long value `1483023240000000` in non row-writer path. "
           + "If enabled, then the timestamp value will be written in both the cases.");
 
-  public static final ConfigProperty<Integer> NUM_FIELDS_IN_KEYLESS_GENERATOR = ConfigProperty
-      .key("hoodie.datasource.write.recordkey.keyless.field.count")
+  public static final ConfigProperty<String> AUTO_GENERATE_RECORD_KEYS = ConfigProperty
+      .key("hoodie.auto.generate.record.keys")
+      .defaultValue("false")
+      .sinceVersion("0.13.0")
+      .withDocumentation("When enabled, hudi will auto generate a deterministic key for a record based on the contents of the field. "
+          + "The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled."
+          + "The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing "
+          + "likelihood for uniqueness.");
+
+  public static final ConfigProperty<Integer> NUM_FIELDS_IN_AUTO_RECORDKEY_GENERATION = ConfigProperty
+      .key("hoodie.datasource.write.auto.recordkey.num.fields")
       .defaultValue(5)
-      .withDocumentation("When using the KeylessKeyGenerator, this sets the number of fields to use when computing the UUID for the record. "
-          + "Increasing the value will increase the randomness of the generated key but can impact performance.");
+      .withDocumentation("When enabling auto generation of record keys(hoodie.auto.generate.record.keys) , this sets the number of fields "
+          + "to use when computing the UUID for the record. Increasing the value will increase the randomness of the generated key but can "
+          + "impact performance.");
 
   /**
    * @deprecated Use {@link #URL_ENCODE_PARTITIONING} and its methods.
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
index caed61249a1..4cfa619d627 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -77,7 +77,8 @@ public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
 
   @Test
   public void testNullRecordKeyFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+    ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp());
+    Assertions.assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord()));
   }
 
   @Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
index 77d1b34f136..5e7a9f0b8ea 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -96,7 +96,8 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
 
   @Test
   public void testNullRecordKeyFields() {
-    assertThrows(IllegalArgumentException.class, () -> new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+    SimpleKeyGenerator keyGenerator = new SimpleKeyGenerator(getPropertiesWithoutRecordKeyProp());
+    assertThrows(IllegalArgumentException.class, () -> keyGenerator.getRecordKey(getRecord()));
   }
 
   @Test
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala
new file mode 100644
index 00000000000..543b4f31c19
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAutoRecordKeyGeneration.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.TimestampBasedKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConversions._
+
+/**
+ * Tests auto generation of record keys.
+ */
+class TestAutoRecordKeyGeneration extends SparkClientFunctionalTestHarness {
+
+  var commonOpts: Map[String, String] = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    "hoodie.bulkinsert.shuffle.parallelism" -> "4",
+    "hoodie.delete.shuffle.parallelism" -> "2",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+  )
+
+  @ParameterizedTest
+  @CsvSource(value = Array(
+    "COPY_ON_WRITE|org.apache.hudi.keygen.SimpleKeyGenerator",
+    "COPY_ON_WRITE|org.apache.hudi.keygen.ComplexKeyGenerator",
+    "COPY_ON_WRITE|org.apache.hudi.keygen.TimestampBasedKeyGenerator",
+    "MERGE_ON_READ|org.apache.hudi.keygen.SimpleKeyGenerator",
+    "MERGE_ON_READ|org.apache.hudi.keygen.ComplexKeyGenerator",
+    "MERGE_ON_READ|org.apache.hudi.keygen.TimestampBasedKeyGenerator"
+  ), delimiter = '|')
+  def testRecordKeyGeneration(tableType: String, keyGenClass: String): Unit = {
+    var options: Map[String, String] = commonOpts +
+      (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
+      (DataSourceWriteOptions.TABLE_TYPE.key() -> tableType) +
+      (KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() -> "true") +
+      (HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true")
+
+    if (keyGenClass == classOf[TimestampBasedKeyGenerator].getName) {
+      options ++= Map(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP -> "DATE_STRING",
+        KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP -> "yyyy-MM-dd",
+        KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP -> "yyyy/MM/dd")
+    }
+
+    val dataGen = new HoodieTestDataGenerator(0xDEED)
+    val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+    // Bulk Insert Operation
+    val records0 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
+    inputDf0.write.format("org.apache.hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+    // Snapshot query
+    val snapshotDf0 = spark.read.format("org.apache.hudi")
+      .load(basePath)
+    assertEquals(100, snapshotDf0.count())
+
+    // Insert
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDf1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDf1.write.format("org.apache.hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "001"))
+
+    // Snapshot query
+    val snapshotDf1 = spark.read.format("org.apache.hudi")
+      .load(basePath)
+    assertEquals(200, snapshotDf1.count())
+
+    // even though we generate updates, since auto generation of record keys are enabled, it should result in new records
+    val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50)).toList
+    val updateDf = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    updateDf.write.format("org.apache.hudi")
+      .options(options)
+      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val snapshotDf2 = spark.read.format("hudi")
+      .load(basePath)
+    assertEquals(250, snapshotDf2.count())
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 1bb81f7f92e..12c1179fde2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -95,7 +95,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
       props.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "partitionField")
 
       assertThrows(classOf[IllegalArgumentException]) {
-        new SimpleKeyGenerator(props)
+        new SimpleKeyGenerator(props).getRecordKey(baseRow)
       }
     }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index c2073c703d9..e95d8bab344 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -62,6 +62,7 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveSyncClient;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.utilities.DummySchemaProvider;
 import org.apache.hudi.utilities.HoodieClusteringJob;
@@ -2421,6 +2422,32 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
   }
 
+  @Test
+  public void testAutoGenerateRecordKeys() throws Exception {
+    boolean useSchemaProvider = false;
+    List<String> transformerClassNames = null;
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
+    int parquetRecordsCount = 100;
+    boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
+    prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "partition_path", "");
+
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+        transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
+        useSchemaProvider, 100000, false, null, null, "timestamp", null);
+    config.configs.add(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key() + "=true");
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+
+    prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
+    testNum++;
+  }
+
   class TestDeltaSync extends DeltaSync {
 
     public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,