You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/11/02 21:17:44 UTC

[hudi] branch master updated: [HUDI-912] Refactor and relocate KeyGenerator to support more engines (#2200)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d160abb  [HUDI-912] Refactor and relocate KeyGenerator to support more engines (#2200)
d160abb is described below

commit d160abb43740e0bcdf40458c345ecd2d74e6698c
Author: wangxianghu <wx...@126.com>
AuthorDate: Tue Nov 3 05:12:51 2020 +0800

    [HUDI-912] Refactor and relocate KeyGenerator to support more engines (#2200)
    
    * [HUDI-912] Refactor and relocate KeyGenerator to support more engines
    
    * Rename KeyGenerators
---
 hudi-client/hudi-client-common/pom.xml             |   5 +
 .../exception/HoodieKeyGeneratorException.java     |  33 +++++
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |  81 +++++++++++
 .../hudi/keygen/ComplexAvroKeyGenerator.java       |  50 +++++++
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java |  73 ++++------
 .../hudi/keygen/GlobalAvroDeleteKeyGenerator.java  |  59 ++++++++
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |  33 ++++-
 .../java/org/apache/hudi/keygen/KeyGenerator.java  |  38 +----
 .../keygen/NonpartitionedAvroKeyGenerator.java     |  28 ++--
 .../apache/hudi/keygen/SimpleAvroKeyGenerator.java |  57 ++++++++
 .../keygen/TimestampBasedAvroKeyGenerator.java     |  79 ++++-------
 .../hudi/keygen/constant/KeyGeneratorOptions.java  |  41 ++++++
 .../parser/AbstractHoodieDateTimeParser.java       |   6 +-
 .../keygen/parser/HoodieDateTimeParserImpl.java    |  10 +-
 hudi-client/hudi-spark-client/pom.xml              |   7 +
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    |  80 +++++------
 .../apache/hudi/keygen/ComplexKeyGenerator.java    |  36 ++---
 .../org/apache/hudi/keygen/CustomKeyGenerator.java |  70 ++++------
 .../hudi/keygen/GlobalDeleteKeyGenerator.java      |  18 +--
 .../hudi/keygen/NonpartitionedKeyGenerator.java    |  20 +--
 .../apache/hudi/keygen/RowKeyGeneratorHelper.java  |   4 +-
 .../org/apache/hudi/keygen/SimpleKeyGenerator.java |  16 ++-
 .../hudi/keygen/TimestampBasedKeyGenerator.java    |  82 +++++++++++
 .../org/apache/hudi/AvroConversionHelper.scala     |  17 +--
 .../org/apache/hudi/AvroConversionUtils.scala      |  21 +--
 .../hudi/keygen/TestComplexKeyGenerator.java       | 154 +++++++++++++++++++++
 .../apache/hudi/keygen/TestCustomKeyGenerator.java |  50 +++----
 .../hudi/keygen/TestGlobalDeleteKeyGenerator.java  |  14 +-
 .../apache/hudi/keygen/TestSimpleKeyGenerator.java |  22 +--
 .../keygen/TestTimestampBasedKeyGenerator.java     |  50 +++----
 .../hudi/testutils/KeyGeneratorTestUtilities.java  |   0
 .../apache/hudi/HoodieDatasetBulkInsertHelper.java |   4 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  13 +-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |   1 -
 .../src/test/java/TestComplexKeyGenerator.java     |  86 ------------
 .../hudi/keygen/TestComplexKeyGenerator.java       |  96 -------------
 .../org/apache/hudi/TestAvroConversionHelper.scala |   3 +-
 .../org/apache/hudi/TestDataSourceDefaults.scala   |  19 ++-
 38 files changed, 895 insertions(+), 581 deletions(-)

diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index 902de58..487a2e2 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -50,6 +50,11 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
     <!-- Logging -->
     <dependency>
       <groupId>log4j</groupId>
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java
new file mode 100644
index 0000000..5bf06e2
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+/**
+ * Exception thrown for any higher level errors when {@link org.apache.hudi.keygen.KeyGeneratorInterface} is generating
+ * a {@link org.apache.hudi.common.model.HoodieKey}.
+ */
+public class HoodieKeyGeneratorException extends HoodieException {
+
+  public HoodieKeyGeneratorException(String msg, Throwable e) {
+    super(msg, e);
+  }
+
+  public HoodieKeyGeneratorException(String msg) {
+    super(msg);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
new file mode 100644
index 0000000..8020be8
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class BaseKeyGenerator extends KeyGenerator {
+
+  protected List<String> recordKeyFields;
+  protected List<String> partitionPathFields;
+  protected final boolean encodePartitionPath;
+  protected final boolean hiveStylePartitioning;
+
+  protected BaseKeyGenerator(TypedProperties config) {
+    super(config);
+    this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY,
+        Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL));
+    this.hiveStylePartitioning = config.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,
+        Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL));
+  }
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  public abstract String getRecordKey(GenericRecord record);
+
+  /**
+   * Generate a partition path out of provided generic record.
+   */
+  public abstract String getPartitionPath(GenericRecord record);
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  @Override
+  public final HoodieKey getKey(GenericRecord record) {
+    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
+      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+    }
+    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+  }
+
+  @Override
+  public final List<String> getRecordKeyFieldNames() {
+    // For nested columns, pick top level column name
+    return getRecordKeyFields().stream().map(k -> {
+      int idx = k.indexOf('.');
+      return idx > 0 ? k.substring(0, idx) : k;
+    }).collect(Collectors.toList());
+  }
+
+  public List<String> getRecordKeyFields() {
+    return recordKeyFields;
+  }
+
+  public List<String> getPartitionPathFields() {
+    return partitionPathFields;
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
new file mode 100644
index 0000000..edc1ad9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
+ */
+public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
+  public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
+
+  public ComplexAvroKeyGenerator(TypedProperties props) {
+    super(props);
+    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
+        .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+    this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
+        .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
+  }
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
similarity index 64%
copy from hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
copy to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index e457688..6266fd1 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -18,14 +18,11 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -34,17 +31,17 @@ import java.util.stream.Collectors;
 /**
  * This is a generic implementation of KeyGenerator where users can configure record key as a single field or a combination of fields. Similarly partition path can be configured to have multiple
  * fields or only one field. This class expects value for prop "hoodie.datasource.write.partitionpath.field" in a specific format. For example:
- *
+ * <p>
  * properties.put("hoodie.datasource.write.partitionpath.field", "field1:PartitionKeyType1,field2:PartitionKeyType2").
- *
+ * <p>
  * The complete partition path is created as <value for field1 basis PartitionKeyType1>/<value for field2 basis PartitionKeyType2> and so on.
- *
+ * <p>
  * Few points to consider: 1. If you want to customize some partition path field on a timestamp basis, you can use field1:timestampBased 2. If you simply want to have the value of your configured
  * field in the partition path, use field1:simple 3. If you want your table to be non partitioned, simply leave it as blank.
- *
+ * <p>
  * RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator.
  */
-public class CustomKeyGenerator extends BuiltinKeyGenerator {
+public class CustomAvroKeyGenerator extends BaseKeyGenerator {
 
   private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
   private static final String SPLIT_REGEX = ":";
@@ -56,23 +53,14 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
     SIMPLE, TIMESTAMP
   }
 
-  public CustomKeyGenerator(TypedProperties props) {
+  public CustomAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
-    this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
-  }
-
-  @Override
-  public String getPartitionPath(Row row) {
-    return getPartitionPath(Option.empty(), Option.of(row));
+    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
+    this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
   }
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    return getPartitionPath(Option.of(record), Option.empty());
-  }
-
-  private String getPartitionPath(Option<GenericRecord> record, Option<Row> row) {
     if (getPartitionPathFields() == null) {
       throw new HoodieKeyException("Unable to find field names for partition path in cfg");
     }
@@ -94,27 +82,18 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
       PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
       switch (keyType) {
         case SIMPLE:
-          if (record.isPresent()) {
-            partitionPath.append(new SimpleKeyGenerator(config, partitionPathField).getPartitionPath(record.get()));
-          } else {
-            partitionPath.append(new SimpleKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
-          }
+          partitionPath.append(new SimpleAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
           break;
         case TIMESTAMP:
           try {
-            if (record.isPresent()) {
-              partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(record.get()));
-            } else {
-              partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
-            }
-          } catch (IOException ioe) {
-            throw new HoodieDeltaStreamerException("Unable to initialise TimestampBasedKeyGenerator class");
+            partitionPath.append(new TimestampBasedAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
+          } catch (IOException e) {
+            throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class");
           }
           break;
         default:
-          throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
+          throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
       }
-
       partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
     }
     partitionPath.deleteCharAt(partitionPath.length() - 1);
@@ -125,16 +104,8 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
   public String getRecordKey(GenericRecord record) {
     validateRecordKeyFields();
     return getRecordKeyFields().size() == 1
-        ? new SimpleKeyGenerator(config).getRecordKey(record)
-        : new ComplexKeyGenerator(config).getRecordKey(record);
-  }
-
-  @Override
-  public String getRecordKey(Row row) {
-    validateRecordKeyFields();
-    return getRecordKeyFields().size() == 1
-        ? new SimpleKeyGenerator(config).getRecordKey(row)
-        : new ComplexKeyGenerator(config).getRecordKey(row);
+        ? new SimpleAvroKeyGenerator(config).getRecordKey(record)
+        : new ComplexAvroKeyGenerator(config).getRecordKey(record);
   }
 
   private void validateRecordKeyFields() {
@@ -142,4 +113,12 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
       throw new HoodieKeyException("Unable to find field names for record key in cfg");
     }
   }
+
+  public String getDefaultPartitionPathSeparator() {
+    return DEFAULT_PARTITION_PATH_SEPARATOR;
+  }
+
+  public String getSplitRegex() {
+    return SPLIT_REGEX;
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
new file mode 100644
index 0000000..b074a25
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Avro Key generator for deletes using global indices. Global index deletes do not require partition value so this key generator
+ * avoids using partition value for generating HoodieKey.
+ */
+public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
+
+  private static final String EMPTY_PARTITION = "";
+
+  public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
+    super(config);
+    this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(","));
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return EMPTY_PARTITION;
+  }
+
+  @Override
+  public List<String> getPartitionPathFields() {
+    return new ArrayList<>();
+  }
+
+  public String getEmptyPartition() {
+    return EMPTY_PARTITION;
+  }
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
similarity index 82%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 8e9700b..1f59bab 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -18,14 +18,20 @@
 
 package org.apache.hudi.keygen;
 
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 
 public class KeyGenUtils {
 
@@ -111,4 +117,23 @@ public class KeyGenUtils {
     }
     return partitionPath;
   }
+
+  /**
+   * Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed.
+   */
+  public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException  {
+    try {
+      return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props);
+    } catch (Throwable e) {
+      throw new IOException("Could not load date time parser class " + parserClass, e);
+    }
+  }
+
+  public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
+    checkPropNames.forEach(prop -> {
+      if (!props.containsKey(prop)) {
+        throw new HoodieNotSupportedException("Required property " + prop + " is missing");
+      }
+    });
+  }
 }
\ No newline at end of file
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
similarity index 57%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
index 3b1db08..8c3f794 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
@@ -19,30 +19,22 @@
 package org.apache.hudi.keygen;
 
 import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.AvroConversionHelper;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
-import scala.Function1;
 
-import java.io.Serializable;
 import java.util.List;
 
 /**
  * Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record.
  */
 @PublicAPIClass(maturity = ApiMaturityLevel.STABLE)
-public abstract class KeyGenerator implements Serializable, SparkKeyGeneratorInterface {
-
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
+public abstract class KeyGenerator implements KeyGeneratorInterface {
 
   protected TypedProperties config;
-  private transient Function1<Object, Object> converterFn = null;
 
   protected KeyGenerator(TypedProperties config) {
     this.config = config;
@@ -64,32 +56,4 @@ public abstract class KeyGenerator implements Serializable, SparkKeyGeneratorInt
     throw new UnsupportedOperationException("Bootstrap not supported for key generator. "
         + "Please override this method in your custom key generator.");
   }
-
-  /**
-   * Fetch record key from {@link Row}.
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getRecordKey(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
-    }
-    GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
-    return getKey(genericRecord).getRecordKey();
-  }
-
-  /**
-   * Fetch partition path from {@link Row}.
-   * @param row instance of {@link Row} from which partition path is requested
-   * @return the partition path of interest from {@link Row}.
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getPartitionPath(Row row) {
-    if (null == converterFn) {
-      converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
-    }
-    GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
-    return getKey(genericRecord).getPartitionPath();
-  }
 }
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
similarity index 55%
copy from hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
copy to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index db51024..a5272b3 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +17,21 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.common.config.TypedProperties;
-
 import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
+import org.apache.hudi.common.config.TypedProperties;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Simple Key generator for unpartitioned Hive Tables.
+ * Avro simple Key generator for unpartitioned Hive Tables.
  */
-public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
+public class NonpartitionedAvroKeyGenerator extends SimpleAvroKeyGenerator {
 
   private static final String EMPTY_PARTITION = "";
   private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>();
 
-  public NonpartitionedKeyGenerator(TypedProperties props) {
+  public NonpartitionedAvroKeyGenerator(TypedProperties props) {
     super(props);
   }
 
@@ -48,8 +45,7 @@ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
     return EMPTY_PARTITION_FIELD_LIST;
   }
 
-  @Override
-  public String getPartitionPath(Row row) {
+  public String getEmptyPartition() {
     return EMPTY_PARTITION;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
new file mode 100644
index 0000000..59fe6be
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.Collections;
+
+/**
+ * Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
+ */
+public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
+
+  public SimpleAvroKeyGenerator(TypedProperties props) {
+    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+  }
+
+  SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) {
+    this(props, null, partitionPathField);
+  }
+
+  SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
+    super(props);
+    this.recordKeyFields = recordKeyField == null
+        ? Collections.emptyList()
+        : Collections.singletonList(recordKeyField);
+    this.partitionPathFields = Collections.singletonList(partitionPathField);
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
+  }
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
similarity index 72%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
index 97a7d2e..28048a1 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +17,16 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser;
 import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.sql.Row;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
@@ -46,15 +42,11 @@ import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
-import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
-import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
+ * Avro Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
  */
-public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
-
+public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator {
   public enum TimestampType implements Serializable {
     UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR
   }
@@ -96,19 +88,19 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
     static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class";
   }
 
-  public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
-    this(config, config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()),
-        config.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()));
+  public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException {
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
   }
 
-  TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
+  TimestampBasedAvroKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
     this(config, null, partitionPathField);
   }
 
-  TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
+  TimestampBasedAvroKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
     super(config, recordKeyField, partitionPathField);
     String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName());
-    this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
+    this.parser = KeyGenUtils.createDateTimeParser(config, dateTimeParserClass);
     this.inputDateTimeZone = parser.getInputDateTimeZone();
     this.outputDateTimeZone = parser.getOutputDateTimeZone();
     this.outputDateFormat = parser.getOutputDateFormat();
@@ -128,8 +120,8 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
       default:
         timeUnit = null;
     }
-    this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
-        Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
+    this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY,
+        Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL));
   }
 
   @Override
@@ -141,14 +133,14 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
     try {
       return getPartitionPath(partitionVal);
     } catch (Exception e) {
-      throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e);
+      throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + partitionVal, e);
     }
   }
 
   /**
    * Set default value to partitionVal if the input value of partitionPathField is null.
    */
-  private Object getDefaultPartitionVal() {
+  public Object getDefaultPartitionVal() {
     Object result = 1L;
     if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
       // since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP
@@ -191,7 +183,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
    * @param partitionVal partition path object value fetched from record/row
    * @return the parsed partition path based on data type
    */
-  private String getPartitionPath(Object partitionVal) {
+  public String getPartitionPath(Object partitionVal) {
     initIfNeeded();
     long timeMs;
     if (partitionVal instanceof Double) {
@@ -202,7 +194,7 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
       timeMs = convertLongTimeToMillis((Long) partitionVal);
     } else if (partitionVal instanceof CharSequence) {
       if (!inputFormatter.isPresent()) {
-        throw new HoodieException("Missing inputformatter. Ensure " +  Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
+        throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
       }
       DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
       if (this.outputDateTimeZone == null) {
@@ -235,27 +227,4 @@ public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
     return MILLISECONDS.convert(partitionVal, timeUnit);
   }
 
-  @Override
-  public String getRecordKey(Row row) {
-    buildFieldPositionMapIfNeeded(row.schema());
-    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false);
-  }
-
-  @Override
-  public String getPartitionPath(Row row) {
-    Object fieldVal = null;
-    buildFieldPositionMapIfNeeded(row.schema());
-    Object partitionPathFieldVal =  RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
-    try {
-      if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
-          || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
-        fieldVal = getDefaultPartitionVal();
-      } else {
-        fieldVal = partitionPathFieldVal;
-      }
-      return getPartitionPath(fieldVal);
-    } catch (Exception e) {
-      throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + fieldVal, e);
-    }
-  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
new file mode 100644
index 0000000..da567e0
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen.constant;
+
+public class KeyGeneratorOptions {
+
+  /**
+   * Flag to indicate whether to use Hive style partitioning.
+   * If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
+   * By default false (the names of partition folders are only partition values)
+   */
+  public static final String URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode";
+  public static final String DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false";
+  public static final String HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning";
+  public static final String DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false";
+
+  /**
+   * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
+   * will be obtained by invoking .toString() on the field value. Nested fields can be specified using
+   * the dot notation eg: `a.b.c`
+   */
+  public static final String RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field";
+  public static final String PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field";
+}
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
similarity index 88%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
index 80e26cc..6fb05c3 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java
@@ -19,7 +19,7 @@ package org.apache.hudi.keygen.parser;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormatter;
 
@@ -36,7 +36,7 @@ public abstract class AbstractHoodieDateTimeParser implements Serializable {
   }
 
   private String initInputDateFormatDelimiter() {
-    String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
+    String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim();
     inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter;
     return inputDateFormatDelimiter;
   }
@@ -45,7 +45,7 @@ public abstract class AbstractHoodieDateTimeParser implements Serializable {
    * Returns the output date format in which the partition paths will be created for the hudi dataset.
    */
   public String getOutputDateFormat() {
-    return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
+    return config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP);
   }
 
   /**
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
similarity index 91%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
index 41452d0..81960ea 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java
@@ -17,11 +17,11 @@
 
 package org.apache.hudi.keygen.parser;
 
-import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
-import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -42,7 +42,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
 
   public HoodieDateTimeParserImpl(TypedProperties config) {
     super(config);
-    DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
+    KeyGenUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
     this.inputDateTimeZone = getInputDateTimeZone();
   }
 
@@ -79,7 +79,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser {
   public Option<DateTimeFormatter> getInputFormatter() {
     TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
     if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
-      DataSourceUtils.checkRequiredProperties(config,
+      KeyGenUtils.checkRequiredProperties(config,
           Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
       this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
       return Option.of(getInputDateFormatter());
diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml
index d99346d..5cc6ad6 100644
--- a/hudi-client/hudi-spark-client/pom.xml
+++ b/hudi-client/hudi-spark-client/pom.xml
@@ -31,6 +31,13 @@
   <packaging>jar</packaging>
 
   <dependencies>
+    <!-- Scala -->
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>${scala.version}</version>
+    </dependency>
+
     <!-- Hudi  -->
     <dependency>
       <groupId>org.apache.hudi</groupId>
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
similarity index 61%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
index 8c973a6..a0c1991 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
@@ -18,70 +18,67 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructType;
+import scala.Function1;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /**
- * Base class for all the built-in key generators. Contains methods structured for
+ * Base class for the built-in key generators. Contains methods structured for
  * code reuse amongst them.
  */
-public abstract class BuiltinKeyGenerator extends KeyGenerator {
+public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
 
-  protected List<String> recordKeyFields;
-  protected List<String> partitionPathFields;
-  protected final boolean encodePartitionPath;
-  protected final boolean hiveStylePartitioning;
+  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
+  private static final String NAMESPACE = "hoodieRow";
+  private transient Function1<Object, Object> converterFn = null;
+  protected StructType structType;
 
   protected Map<String, List<Integer>> recordKeyPositions = new HashMap<>();
   protected Map<String, List<Integer>> partitionPathPositions = new HashMap<>();
-  protected StructType structType;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
-    this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(),
-        Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL()));
-    this.hiveStylePartitioning = config.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(),
-        Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL()));
   }
 
   /**
-   * Generate a record Key out of provided generic record.
-   */
-  public abstract String getRecordKey(GenericRecord record);
-
-  /**
-   * Generate a partition path out of provided generic record.
-   */
-  public abstract String getPartitionPath(GenericRecord record);
-
-  /**
-   * Generate a Hoodie Key out of provided generic record.
+   * Fetch record key from {@link Row}.
+   * @param row instance of {@link Row} from which record key is requested.
+   * @return the record key of interest from {@link Row}.
    */
-  public final HoodieKey getKey(GenericRecord record) {
-    if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
-      throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
+  @Override
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public String getRecordKey(Row row) {
+    if (null == converterFn) {
+      converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
     }
-    return new HoodieKey(getRecordKey(record), getPartitionPath(record));
+    GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+    return getKey(genericRecord).getRecordKey();
   }
 
+  /**
+   * Fetch partition path from {@link Row}.
+   * @param row instance of {@link Row} from which partition path is requested
+   * @return the partition path of interest from {@link Row}.
+   */
   @Override
-  public final List<String> getRecordKeyFieldNames() {
-    // For nested columns, pick top level column name
-    return getRecordKeyFields().stream().map(k -> {
-      int idx = k.indexOf('.');
-      return idx > 0 ? k.substring(0, idx) : k;
-    }).collect(Collectors.toList());
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public String getPartitionPath(Row row) {
+    if (null == converterFn) {
+      converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
+    }
+    GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
+    return getKey(genericRecord).getPartitionPath();
   }
 
   void buildFieldPositionMapIfNeeded(StructType structType) {
@@ -119,12 +116,5 @@ public abstract class BuiltinKeyGenerator extends KeyGenerator {
       this.structType = structType;
     }
   }
-
-  public List<String> getRecordKeyFields() {
-    return recordKeyFields;
-  }
-
-  public List<String> getPartitionPathFields() {
-    return partitionPathFields;
-  }
 }
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
similarity index 61%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
index e679e99..36c8345 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java
@@ -1,13 +1,12 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,38 +17,38 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.spark.sql.Row;
 
 import java.util.Arrays;
 import java.util.stream.Collectors;
-import org.apache.spark.sql.Row;
 
 /**
  * Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
  */
 public class ComplexKeyGenerator extends BuiltinKeyGenerator {
 
-  public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
+  private final ComplexAvroKeyGenerator complexAvroKeyGenerator;
 
   public ComplexKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY())
+    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
         .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
-    this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY())
+    this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
         .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
+    complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+    return complexAvroKeyGenerator.getRecordKey(record);
   }
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
+    return complexAvroKeyGenerator.getPartitionPath(record);
   }
 
   @Override
@@ -64,4 +63,5 @@ public class ComplexKeyGenerator extends BuiltinKeyGenerator {
     return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(),
         hiveStylePartitioning, partitionPathPositions);
   }
-}
\ No newline at end of file
+
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
similarity index 75%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index e457688..6727b79 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -18,13 +18,12 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.spark.sql.Row;
 
 import java.io.IOException;
@@ -46,30 +45,36 @@ import java.util.stream.Collectors;
  */
 public class CustomKeyGenerator extends BuiltinKeyGenerator {
 
-  private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
-  private static final String SPLIT_REGEX = ":";
-
-  /**
-   * Used as a part of config in CustomKeyGenerator.java.
-   */
-  public enum PartitionKeyType {
-    SIMPLE, TIMESTAMP
-  }
+  private final CustomAvroKeyGenerator customAvroKeyGenerator;
 
   public CustomKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
-    this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList());
+    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
+    this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
+    customAvroKeyGenerator = new CustomAvroKeyGenerator(props);
   }
 
   @Override
-  public String getPartitionPath(Row row) {
-    return getPartitionPath(Option.empty(), Option.of(row));
+  public String getRecordKey(GenericRecord record) {
+    return customAvroKeyGenerator.getRecordKey(record);
   }
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    return getPartitionPath(Option.of(record), Option.empty());
+    return customAvroKeyGenerator.getPartitionPath(record);
+  }
+
+  @Override
+  public String getRecordKey(Row row) {
+    validateRecordKeyFields();
+    return getRecordKeyFields().size() == 1
+        ? new SimpleKeyGenerator(config).getRecordKey(row)
+        : new ComplexKeyGenerator(config).getRecordKey(row);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    return getPartitionPath(Option.empty(), Option.of(row));
   }
 
   private String getPartitionPath(Option<GenericRecord> record, Option<Row> row) {
@@ -85,13 +90,13 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
       return "";
     }
     for (String field : getPartitionPathFields()) {
-      String[] fieldWithType = field.split(SPLIT_REGEX);
+      String[] fieldWithType = field.split(customAvroKeyGenerator.getSplitRegex());
       if (fieldWithType.length != 2) {
-        throw new HoodieKeyException("Unable to find field names for partition path in proper format");
+        throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format");
       }
 
       partitionPathField = fieldWithType[0];
-      PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
+      CustomAvroKeyGenerator.PartitionKeyType keyType = CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
       switch (keyType) {
         case SIMPLE:
           if (record.isPresent()) {
@@ -108,38 +113,23 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
               partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(row.get()));
             }
           } catch (IOException ioe) {
-            throw new HoodieDeltaStreamerException("Unable to initialise TimestampBasedKeyGenerator class");
+            throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class");
           }
           break;
         default:
-          throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
+          throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
       }
 
-      partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+      partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator());
     }
     partitionPath.deleteCharAt(partitionPath.length() - 1);
     return partitionPath.toString();
   }
 
-  @Override
-  public String getRecordKey(GenericRecord record) {
-    validateRecordKeyFields();
-    return getRecordKeyFields().size() == 1
-        ? new SimpleKeyGenerator(config).getRecordKey(record)
-        : new ComplexKeyGenerator(config).getRecordKey(record);
-  }
-
-  @Override
-  public String getRecordKey(Row row) {
-    validateRecordKeyFields();
-    return getRecordKeyFields().size() == 1
-        ? new SimpleKeyGenerator(config).getRecordKey(row)
-        : new ComplexKeyGenerator(config).getRecordKey(row);
-  }
-
   private void validateRecordKeyFields() {
     if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
       throw new HoodieKeyException("Unable to find field names for record key in cfg");
     }
   }
 }
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
similarity index 78%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
index 243493b..5c9a813 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java
@@ -18,10 +18,9 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.spark.sql.Row;
 
 import java.util.ArrayList;
@@ -34,21 +33,21 @@ import java.util.List;
  */
 public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
 
-  private static final String EMPTY_PARTITION = "";
-
+  private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator;
   public GlobalDeleteKeyGenerator(TypedProperties config) {
     super(config);
-    this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(","));
+    this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(","));
+    globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
+    return globalAvroDeleteKeyGenerator.getRecordKey(record);
   }
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    return EMPTY_PARTITION;
+    return globalAvroDeleteKeyGenerator.getPartitionPath(record);
   }
 
   @Override
@@ -64,6 +63,7 @@ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator {
 
   @Override
   public String getPartitionPath(Row row) {
-    return EMPTY_PARTITION;
+    return globalAvroDeleteKeyGenerator.getEmptyPartition();
   }
 }
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
similarity index 75%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
index db51024..543e134 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java
@@ -18,12 +18,10 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.common.config.TypedProperties;
-
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.spark.sql.Row;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -31,25 +29,27 @@ import java.util.List;
  */
 public class NonpartitionedKeyGenerator extends SimpleKeyGenerator {
 
-  private static final String EMPTY_PARTITION = "";
-  private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>();
+  private final NonpartitionedAvroKeyGenerator nonpartitionedAvroKeyGenerator;
 
-  public NonpartitionedKeyGenerator(TypedProperties props) {
-    super(props);
+  public NonpartitionedKeyGenerator(TypedProperties config) {
+    super(config);
+    nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(config);
   }
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    return EMPTY_PARTITION;
+    return nonpartitionedAvroKeyGenerator.getPartitionPath(record);
   }
 
   @Override
   public List<String> getPartitionPathFields() {
-    return EMPTY_PARTITION_FIELD_LIST;
+    return nonpartitionedAvroKeyGenerator.getPartitionPathFields();
   }
 
   @Override
   public String getPartitionPath(Row row) {
-    return EMPTY_PARTITION;
+    return nonpartitionedAvroKeyGenerator.getEmptyPartition();
   }
+
 }
+
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
similarity index 99%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
index 4c05489..dd0d4c5 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
@@ -19,10 +19,10 @@
 package org.apache.hudi.keygen;
 
 import org.apache.hudi.exception.HoodieKeyException;
-
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
+import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,8 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import scala.Option;
-
 import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
 import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
 import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
similarity index 82%
rename from hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index c2b8b12..332686d 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -18,10 +18,9 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.spark.sql.Row;
 
 import java.util.Collections;
@@ -31,9 +30,11 @@ import java.util.Collections;
  */
 public class SimpleKeyGenerator extends BuiltinKeyGenerator {
 
+  private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
+
   public SimpleKeyGenerator(TypedProperties props) {
-    this(props, props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()),
-        props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()));
+    this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
   }
 
   SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
@@ -46,16 +47,17 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator {
         ? Collections.emptyList()
         : Collections.singletonList(recordKeyField);
     this.partitionPathFields = Collections.singletonList(partitionPathField);
+    simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0));
+    return simpleAvroKeyGenerator.getRecordKey(record);
   }
 
   @Override
   public String getPartitionPath(GenericRecord record) {
-    return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath);
+    return simpleAvroKeyGenerator.getPartitionPath(record);
   }
 
   @Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
new file mode 100644
index 0000000..859269c
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.spark.sql.Row;
+
+import java.io.IOException;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Key generator, that relies on timestamps for partitioning field. Still picks record key by name.
+ */
+public class TimestampBasedKeyGenerator extends SimpleKeyGenerator {
+
+  private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator;
+
+  public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
+    this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY),
+        config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY));
+  }
+
+  TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException {
+    this(config, null, partitionPathField);
+  }
+
+  TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException {
+    super(config, recordKeyField, partitionPathField);
+    timestampBasedAvroKeyGenerator = new TimestampBasedAvroKeyGenerator(config, recordKeyField, partitionPathField);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return timestampBasedAvroKeyGenerator.getPartitionPath(record);
+  }
+
+  @Override
+  public String getRecordKey(Row row) {
+    buildFieldPositionMapIfNeeded(row.schema());
+    return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    Object fieldVal = null;
+    buildFieldPositionMapIfNeeded(row.schema());
+    Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0)));
+    try {
+      if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER)
+          || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+        fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal();
+      } else {
+        fieldVal = partitionPathFieldVal;
+      }
+      return timestampBasedAvroKeyGenerator.getPartitionPath(fieldVal);
+    } catch (Exception e) {
+      throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + fieldVal, e);
+    }
+  }
+}
\ No newline at end of file
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
similarity index 96%
rename from hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index c701e70..db1ca6f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,10 +24,10 @@ import java.util
 
 import org.apache.avro.Conversions.DecimalConversion
 import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
-import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Schema.Type._
 import org.apache.avro.generic.GenericData.{Fixed, Record}
 import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
+import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
similarity index 88%
rename from hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 70a1356..d1a4249 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +18,14 @@
 
 package org.apache.hudi
 
-import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
-import org.apache.hudi.common.model.HoodieKey
 import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
 import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieKey
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
 import scala.collection.JavaConverters._
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
new file mode 100644
index 0000000..54f4ffa
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static junit.framework.TestCase.assertEquals;
+
+public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
+
+  private TypedProperties getCommonProps(boolean getComplexRecordKey) {
+    TypedProperties properties = new TypedProperties();
+    if (getComplexRecordKey) {
+      properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col");
+    } else {
+      properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
+    }
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
+    return properties;
+  }
+
+  private TypedProperties getPropertiesWithoutPartitionPathProp() {
+    return getCommonProps(false);
+  }
+
+  private TypedProperties getPropertiesWithoutRecordKeyProp() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+    return properties;
+  }
+
+  private TypedProperties getWrongRecordKeyFieldProps() {
+    TypedProperties properties = new TypedProperties();
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
+    return properties;
+  }
+
+  private TypedProperties getProps() {
+    TypedProperties properties = getCommonProps(true);
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms");
+    return properties;
+  }
+
+  @Test
+  public void testNullPartitionPathFields() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
+  }
+
+  @Test
+  public void testNullRecordKeyFields() {
+    Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
+  }
+
+  @Test
+  public void testWrongRecordKeyField() {
+    ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
+    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
+    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
+  }
+
+  @Test
+  public void testHappyFlow() {
+    ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
+    GenericRecord record = getRecord();
+    HoodieKey key = keyGenerator.getKey(record);
+    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
+    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21");
+    Row row = KeyGeneratorTestUtilities.getRow(record);
+    Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
+    Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
+  }
+
+  @Test
+  public void testSingleValueKeyGenerator() {
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
+    properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+    ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
+    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1);
+    assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
+    String rowKey = record.get("_row_key").toString();
+    String partitionPath = record.get("timestamp").toString();
+    HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
+    assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
+    assertEquals(partitionPath, hoodieKey.getPartitionPath());
+  }
+
+  @Test
+  public void testMultipleValueKeyGenerator() {
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp");
+    properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "rider,driver");
+    ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
+    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
+    assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
+    String rowKey =
+        "_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+            + "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
+    String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString();
+    HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
+    assertEquals(rowKey, hoodieKey.getRecordKey());
+    assertEquals(partitionPath, hoodieKey.getPartitionPath());
+  }
+
+  @Test
+  public void testMultipleValueKeyGeneratorNonPartitioned() {
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp");
+    properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "");
+    ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
+    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
+    assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
+    String rowKey =
+        "_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
+            + "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
+    String partitionPath = "";
+    HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
+    assertEquals(rowKey, hoodieKey.getRecordKey());
+    assertEquals(partitionPath, hoodieKey.getPartitionPath());
+  }
+}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
similarity index 77%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index add2547..dc30b93 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -18,11 +18,11 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.config.TypedProperties;
 
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Assertions;
@@ -33,48 +33,48 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
   private TypedProperties getCommonProps(boolean getComplexRecordKey) {
     TypedProperties properties = new TypedProperties();
     if (getComplexRecordKey) {
-      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col");
+      properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col");
     } else {
-      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+      properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
     }
-    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
     return properties;
   }
 
   private TypedProperties getPropertiesForSimpleKeyGen() {
     TypedProperties properties = getCommonProps(false);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
     return properties;
   }
 
   private TypedProperties getImproperPartitionFieldFormatProp() {
     TypedProperties properties = getCommonProps(false);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
     return properties;
   }
 
   private TypedProperties getInvalidPartitionKeyTypeProps() {
     TypedProperties properties = getCommonProps(false);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:dummy");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:dummy");
     return properties;
   }
 
   private TypedProperties getComplexRecordKeyWithSimplePartitionProps() {
     TypedProperties properties = getCommonProps(true);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
     return properties;
   }
 
   private TypedProperties getComplexRecordKeyAndPartitionPathProps() {
     TypedProperties properties = getCommonProps(true);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple,ts_ms:timestamp");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple,ts_ms:timestamp");
     populateNecessaryPropsForTimestampBasedKeyGen(properties);
     return properties;
   }
 
   private TypedProperties getPropsWithoutRecordKeyFieldProps() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple");
     return properties;
   }
 
@@ -86,20 +86,20 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
   private TypedProperties getPropertiesForTimestampBasedKeyGen() {
     TypedProperties properties = getCommonProps(false);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "ts_ms:timestamp");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "ts_ms:timestamp");
     populateNecessaryPropsForTimestampBasedKeyGen(properties);
     return properties;
   }
 
   private TypedProperties getPropertiesForNonPartitionedKeyGen() {
     TypedProperties properties = getCommonProps(false);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "");
     return properties;
   }
 
   @Test
   public void testSimpleKeyGenerator() {
-    KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen());
+    BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen());
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
     Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -111,7 +111,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
   @Test
   public void testTimestampBasedKeyGenerator() {
-    KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
+    BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen());
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
     Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -123,7 +123,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
   @Test
   public void testNonPartitionedKeyGenerator() {
-    KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
+    BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen());
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
     Assertions.assertEquals(key.getRecordKey(), "key1");
@@ -136,28 +136,28 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
   @Test
   public void testInvalidPartitionKeyType() {
     try {
-      KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
+      BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
       keyGenerator.getKey(getRecord());
       Assertions.fail("should fail when invalid PartitionKeyType is provided!");
     } catch (Exception e) {
-      Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
+      Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
     }
 
     try {
-      KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
+      BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps());
       GenericRecord record = getRecord();
       Row row = KeyGeneratorTestUtilities.getRow(record);
       keyGenerator.getPartitionPath(row);
       Assertions.fail("should fail when invalid PartitionKeyType is provided!");
     } catch (Exception e) {
-      Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY"));
+      Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"));
     }
   }
 
   @Test
   public void testNoRecordKeyFieldProp() {
     try {
-      KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
+      BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
       keyGenerator.getKey(getRecord());
       Assertions.fail("should fail when record key field is not provided!");
     } catch (Exception e) {
@@ -165,7 +165,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
     }
 
     try {
-      KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
+      BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps());
       GenericRecord record = getRecord();
       Row row = KeyGeneratorTestUtilities.getRow(record);
       keyGenerator.getRecordKey(row);
@@ -178,7 +178,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
   @Test
   public void testPartitionFieldsInImproperFormat() {
     try {
-      KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
+      BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
       keyGenerator.getKey(getRecord());
       Assertions.fail("should fail when partition key field is provided in improper format!");
     } catch (Exception e) {
@@ -186,7 +186,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
     }
 
     try {
-      KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
+      BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp());
       GenericRecord record = getRecord();
       Row row = KeyGeneratorTestUtilities.getRow(record);
       keyGenerator.getPartitionPath(row);
@@ -198,7 +198,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
   @Test
   public void testComplexRecordKeyWithSimplePartitionPath() {
-    KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
+    BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps());
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
     Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
@@ -211,7 +211,7 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
 
   @Test
   public void testComplexRecordKeysWithComplexPartitionPath() {
-    KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
+    BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps());
     GenericRecord record = getRecord();
     HoodieKey key = keyGenerator.getKey(record);
     Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
similarity index 84%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
index 96d607a..078101b 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java
@@ -18,12 +18,12 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.exception.HoodieKeyException;
 
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Assertions;
@@ -34,29 +34,29 @@ public class TestGlobalDeleteKeyGenerator extends KeyGeneratorTestUtilities {
   private TypedProperties getCommonProps(boolean getComplexRecordKey) {
     TypedProperties properties = new TypedProperties();
     if (getComplexRecordKey) {
-      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col");
+      properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col");
     } else {
-      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+      properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
     }
-    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
     return properties;
   }
 
   private TypedProperties getPropertiesWithoutRecordKeyProp() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
     return properties;
   }
 
   private TypedProperties getWrongRecordKeyFieldProps() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
     return properties;
   }
 
   private TypedProperties getProps() {
     TypedProperties properties = getCommonProps(true);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms");
     return properties;
   }
 
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
similarity index 82%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
index 4eb184e..80b85d8 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java
@@ -18,12 +18,12 @@
 
 package org.apache.hudi.keygen;
 
-import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.exception.HoodieKeyException;
 
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Assertions;
@@ -33,8 +33,8 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
 
   private TypedProperties getCommonProps() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
-    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
+    properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true");
     return properties;
   }
 
@@ -44,34 +44,34 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities {
 
   private TypedProperties getPropertiesWithoutRecordKeyProp() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
     return properties;
   }
 
   private TypedProperties getWrongRecordKeyFieldProps() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
-    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key");
     return properties;
   }
 
   private TypedProperties getWrongPartitionPathFieldProps() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "_wrong_partition_path");
-    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "_wrong_partition_path");
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key");
     return properties;
   }
 
   private TypedProperties getComplexRecordKeyProp() {
     TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
-    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
+    properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col");
     return properties;
   }
 
   private TypedProperties getProps() {
     TypedProperties properties = getCommonProps();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
+    properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp");
     return properties;
   }
 
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
similarity index 84%
rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
index 7867415..98a8f67 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
@@ -20,14 +20,14 @@ package org.apache.hudi.keygen;
 
 import org.apache.hudi.AvroConversionHelper;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.exception.HoodieDeltaStreamerException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
 import org.apache.spark.sql.types.StructType;
@@ -58,15 +58,15 @@ public class TestTimestampBasedKeyGenerator {
         .generateAvroRecordFromJson(schema, 1, "001", "f1");
     baseRow = genericRecordToRow(baseRecord);
 
-    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "field1");
-    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "createTime");
-    properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "false");
+    properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1");
+    properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "createTime");
+    properties.setProperty(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "false");
   }
 
   private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) {
-    properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
-    properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat);
-    properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone);
+    properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
+    properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat);
+    properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone);
 
     if (scalarType != null) {
       properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType);
@@ -88,22 +88,22 @@ public class TestTimestampBasedKeyGenerator {
 
   private TypedProperties getBaseKeyConfig(String timestampType, String inputFormatList, String inputFormatDelimiterRegex, String inputTimezone, String outputFormat, String outputTimezone) {
     if (timestampType != null) {
-      properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
+      properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType);
     }
     if (inputFormatList != null) {
-      properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList);
+      properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList);
     }
     if (inputFormatDelimiterRegex != null) {
-      properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex);
+      properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex);
     }
     if (inputTimezone != null) {
-      properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone);
+      properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone);
     }
     if (outputFormat != null) {
-      properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat);
+      properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat);
     }
     if (outputTimezone != null) {
-      properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone);
+      properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone);
     }
     return properties;
   }
@@ -213,7 +213,7 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "GMT");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("2020040113", hk1.getPartitionPath());
 
@@ -231,7 +231,7 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("2020040113", hk1.getPartitionPath());
 
@@ -249,7 +249,7 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "UTC");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("2020040113", hk1.getPartitionPath());
 
@@ -267,7 +267,7 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "UTC");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("2020040113", hk1.getPartitionPath());
 
@@ -285,7 +285,7 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "UTC");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("2020040118", hk1.getPartitionPath());
 
@@ -303,7 +303,7 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "UTC");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("2020040118", hk1.getPartitionPath());
 
@@ -321,7 +321,7 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "EST");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("2020040109", hk1.getPartitionPath());
 
@@ -339,11 +339,11 @@ public class TestTimestampBasedKeyGenerator {
         "",
         "yyyyMMddHH",
         "UTC");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
-    Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getKey(baseRecord));
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getKey(baseRecord));
 
     baseRow = genericRecordToRow(baseRecord);
-    Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getPartitionPath(baseRow));
+    Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getPartitionPath(baseRow));
   }
 
   @Test
@@ -356,7 +356,7 @@ public class TestTimestampBasedKeyGenerator {
         "UTC",
         "MM/dd/yyyy",
         "UTC");
-    KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
+    BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties);
     HoodieKey hk1 = keyGen.getKey(baseRecord);
     Assertions.assertEquals("04/01/2020", hk1.getPartitionPath());
 
diff --git a/hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
similarity index 100%
rename from hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java
diff --git a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
index b3ed7ae..c820ebe 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.keygen.BuiltinKeyGenerator;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -72,7 +72,7 @@ public class HoodieDatasetBulkInsertHelper {
     TypedProperties properties = new TypedProperties();
     properties.putAll(config.getProps());
     String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
-    KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
+    BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);
     StructType structTypeForUDF = rows.schema();
 
     sqlContext.udf().register(RECORD_KEY_UDF_FN, (UDF1<Row, String>) keyGenerator::getRecordKey, DataTypes.StringType);
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ce940f7..fc52b38 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.WriteOperationType
 import org.apache.hudi.hive.HiveSyncTool
 import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
 import org.apache.hudi.keygen.SimpleKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import org.apache.log4j.LogManager
 
 /**
@@ -213,14 +214,14 @@ object DataSourceWriteOptions {
     * the dot notation eg: `a.b.c`
     *
     */
-  val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
+  val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY
   val DEFAULT_RECORDKEY_FIELD_OPT_VAL = "uuid"
 
   /**
     * Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual
     * value ontained by invoking .toString()
     */
-  val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field"
+  val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY
   val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath"
 
   /**
@@ -228,10 +229,10 @@ object DataSourceWriteOptions {
     * If set true, the names of partition folders follow <partition_column_name>=<partition_value> format.
     * By default false (the names of partition folders are only partition values)
     */
-  val HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning"
-  val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false"
-  val URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode"
-  val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false"
+  val HIVE_STYLE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY
+  val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL
+  val URL_ENCODE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY
+  val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL
   /**
     * Key generator class, that implements will extract the key out of incoming record
     *
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 32d6a09..1481024 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
 import org.apache.hadoop.conf.Configuration
diff --git a/hudi-spark/src/test/java/TestComplexKeyGenerator.java b/hudi-spark/src/test/java/TestComplexKeyGenerator.java
deleted file mode 100644
index a5a88c2..0000000
--- a/hudi-spark/src/test/java/TestComplexKeyGenerator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import static junit.framework.TestCase.assertEquals;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.keygen.ComplexKeyGenerator;
-import org.junit.jupiter.api.Test;
-
-public class TestComplexKeyGenerator {
-
-  @Test
-  public void testSingleValueKeyGenerator() {
-    TypedProperties properties = new TypedProperties();
-    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
-    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
-    ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
-    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1);
-    assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1);
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
-    String rowKey = record.get("_row_key").toString();
-    String partitionPath = record.get("timestamp").toString();
-    HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
-    assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey());
-    assertEquals(partitionPath, hoodieKey.getPartitionPath());
-  }
-
-  @Test
-  public void testMultipleValueKeyGenerator() {
-    TypedProperties properties = new TypedProperties();
-    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp");
-    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "rider,driver");
-    ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
-    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
-    assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2);
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
-    String rowKey =
-        "_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
-            + "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
-    String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString();
-    HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
-    assertEquals(rowKey, hoodieKey.getRecordKey());
-    assertEquals(partitionPath, hoodieKey.getPartitionPath());
-  }
-
-  @Test
-  public void testMultipleValueKeyGeneratorNonPartitioned() {
-    TypedProperties properties = new TypedProperties();
-    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp");
-    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "");
-    ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties);
-    assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2);
-    assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0);
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    GenericRecord record = dataGenerator.generateGenericRecords(1).get(0);
-    String rowKey =
-        "_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + ","
-            + "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString();
-    String partitionPath = "";
-    HoodieKey hoodieKey = compositeKeyGenerator.getKey(record);
-    assertEquals(rowKey, hoodieKey.getRecordKey());
-    assertEquals(partitionPath, hoodieKey.getPartitionPath());
-  }
-
-}
\ No newline at end of file
diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
deleted file mode 100644
index 4c5ded3..0000000
--- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities {
-
-  private TypedProperties getCommonProps(boolean getComplexRecordKey) {
-    TypedProperties properties = new TypedProperties();
-    if (getComplexRecordKey) {
-      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col");
-    } else {
-      properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
-    }
-    properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true");
-    return properties;
-  }
-
-  private TypedProperties getPropertiesWithoutPartitionPathProp() {
-    return getCommonProps(false);
-  }
-
-  private TypedProperties getPropertiesWithoutRecordKeyProp() {
-    TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
-    return properties;
-  }
-
-  private TypedProperties getWrongRecordKeyFieldProps() {
-    TypedProperties properties = new TypedProperties();
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp");
-    properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key");
-    return properties;
-  }
-
-  private TypedProperties getProps() {
-    TypedProperties properties = getCommonProps(true);
-    properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms");
-    return properties;
-  }
-
-  @Test
-  public void testNullPartitionPathFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp()));
-  }
-
-  @Test
-  public void testNullRecordKeyFields() {
-    Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp()));
-  }
-
-  @Test
-  public void testWrongRecordKeyField() {
-    ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps());
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord()));
-    Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType));
-  }
-
-  @Test
-  public void testHappyFlow() {
-    ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps());
-    GenericRecord record = getRecord();
-    HoodieKey key = keyGenerator.getKey(record);
-    Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi");
-    Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21");
-    Row row = KeyGeneratorTestUtilities.getRow(record);
-    Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi");
-    Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21");
-  }
-}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
index 902359d..e299445 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala
@@ -22,7 +22,6 @@ import java.time.LocalDate
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
-import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.scalatest.{FunSuite, Matchers}
 
@@ -43,7 +42,7 @@ class TestAvroConversionHelper extends FunSuite with Matchers {
 
   test("Logical type: date") {
     val schema = new Schema.Parser().parse(dateSchema)
-    val convertor = AvroConversionHelper.createConverterToRow(schema, convertAvroSchemaToStructType(schema))
+    val convertor = AvroConversionHelper.createConverterToRow(schema, AvroConversionUtils.convertAvroSchemaToStructType(schema))
 
     val dateOutputData = dateInputData.map(x => {
       val record = new GenericData.Record(schema) {{ put("date", x) }}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 73e1f5d..99e1297 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -17,8 +17,6 @@
 
 package org.apache.hudi
 
-import java.util
-
 import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.TypedProperties
@@ -236,14 +234,29 @@ class TestDataSourceDefaults {
     assertEquals("name1", keyGen.getPartitionPath(baseRow))
   }
 
-  class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) {
+  class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) with SparkKeyGeneratorInterface {
     val recordKeyProp: String = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
     val partitionPathProp: String = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY)
+    val STRUCT_NAME: String = "hoodieRowTopLevelField"
+    val NAMESPACE: String = "hoodieRow"
+    var converterFn: Function1[Any, Any] = _
 
     override def getKey(record: GenericRecord): HoodieKey = {
       new HoodieKey(HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyProp, true),
         HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathProp, true))
     }
+
+    override def getRecordKey(row: Row): String = {
+      if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
+      val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
+      getKey(genericRecord).getRecordKey
+    }
+
+    override def getPartitionPath(row: Row): String = {
+      if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE)
+      val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord]
+      getKey(genericRecord).getPartitionPath
+    }
   }
 
   @Test def testComplexKeyGenerator() = {