You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/01/16 23:06:00 UTC

[hudi] branch recordKeyGenRefactor created (now b26ffe6600c)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a change to branch recordKeyGenRefactor
in repository https://gitbox.apache.org/repos/asf/hudi.git


      at b26ffe6600c Fixing record key generation so that any key generator class can have any record key generation(simple, custom, auto generation

This branch includes the following new commits:

     new b26ffe6600c Fixing record key generation so that any key generator class can have any record key generation(simple, custom, auto generation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hudi] 01/01: Fixing record key generation so that any key generator class can have any record key generation(simple, custom, auto generation

Posted by si...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch recordKeyGenRefactor
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b26ffe6600ce80dbeed7c43e5de72b6ff3ddeb99
Author: sivabalan <n....@gmail.com>
AuthorDate: Mon Jan 16 15:05:41 2023 -0800

    Fixing record key generation so that any key generator class can have any record key generation(simple, custom, auto generation
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  14 ++
 .../apache/hudi/keygen/AutoRecordKeyGenerator.java | 235 +++++++++++++++++++++
 .../hudi/keygen/ComplexAvroKeyGenerator.java       |  11 +-
 .../hudi/keygen/ComplexAvroRecordKeyGenerator.java |  42 ++++
 .../hudi/keygen/GlobalAvroDeleteKeyGenerator.java  |   5 +-
 .../keygen/NonpartitionedAvroKeyGenerator.java     |   8 +-
 .../apache/hudi/keygen/SimpleAvroKeyGenerator.java |  11 +-
 .../hudi/keygen/SimpleAvroRecordKeyGenerator.java  |  40 ++++
 .../keygen/factory/RecordKeyGeneratorFactory.java  |  45 ++++
 .../hudi/keygen/SparkKeyGeneratorInterface.java    |  26 +--
 ....java => SparkRecordKeyGeneratorInterface.java} |  31 +--
 .../org/apache/hudi/keygen/BaseKeyGenerator.java   |   4 +-
 .../org/apache/hudi/keygen/RecordKeyGenerator.java |  32 +++
 13 files changed, 435 insertions(+), 69 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b70b13c0833..b907905f99c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -549,6 +549,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we check for \"default\" partition and fail if found one. "
           + "Users are expected to rewrite the data in those partitions. Enabling this config will bypass this validation");
 
+  public static final ConfigProperty<Boolean> AUTO_GENERATE_RECORD_KEYS = ConfigProperty.key("hoodie.auto.generate.record.keys")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("to be added");
+
   private ConsistencyGuardConfig consistencyGuardConfig;
   private FileSystemRetryConfig fileSystemRetryConfig;
 
@@ -2201,6 +2206,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(SKIP_DEFAULT_PARTITION_VALIDATION);
   }
 
+  public Boolean doAutoGenerateRecordKeys() {
+    return getBooleanOrDefault(AUTO_GENERATE_RECORD_KEYS);
+  }
+
   /**
    * Are any table services configured to run inline for both scheduling and execution?
    *
@@ -2723,6 +2732,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withAutoGenerateRecordKeys(boolean autoGenerateRecordKeys) {
+      writeConfig.setValue(AUTO_GENERATE_RECORD_KEYS, String.valueOf(autoGenerateRecordKeys));
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
new file mode 100644
index 00000000000..6beba686dfc
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/AutoRecordKeyGenerator.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.keygen.CustomAvroKeyGenerator.SPLIT_REGEX;
+
+/**
+ * Auto record key generator. This generator will fetch values from the entire record based on some of the fields and determine the record key.
+ * Use-cases where users may not be able to configure record keys, can use this auto record key generator.
+ */
+public class AutoRecordKeyGenerator implements RecordKeyGenerator {
+
+  private final TypedProperties config;
+  private static final String HOODIE_PREFIX = "_hoodie";
+  private static final String DOT = ".";
+  private final int maxFieldsToConsider;
+  private final int numFieldsForKey;
+  private final Set<String> partitionFieldNames;
+  private int[][] fieldOrdering;
+
+  public AutoRecordKeyGenerator(TypedProperties config, List<String> partitionPathFields) {
+    this.config = config;
+    this.numFieldsForKey = config.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue());
+    // cap the number of fields to order in case of large schemas
+    this.maxFieldsToConsider = numFieldsForKey * 3;
+    this.partitionFieldNames = partitionPathFields.stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet());
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return buildKey(getFieldOrdering(record), record);
+  }
+
+  int[][] getFieldOrdering(GenericRecord genericRecord) {
+    if (fieldOrdering == null) {
+      fieldOrdering = buildFieldOrdering(genericRecord.getSchema().getFields());
+    }
+    return fieldOrdering;
+  }
+
+  /**
+   * Deterministically builds a key for the input value based on the provided fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used to generate a string that is passed to
+   * {@link UUID#nameUUIDFromBytes(byte[])}.
+   * @param fieldOrdering an array of integer arrays. The integer arrays represent paths to a single field within the input object.
+   * @param input the input object that needs a key
+   * @return a deterministically generated {@link UUID}
+   * @param <T> the input object type
+   */
+  private <T> String buildKey(int[][] fieldOrdering, GenericRecord input) {
+    StringBuilder key = new StringBuilder();
+    int nonNullFields = 0;
+    for (int[] index : fieldOrdering) {
+      Object value = getFieldForRecord(input, index);
+      if (value == null) {
+        continue;
+      }
+      nonNullFields++;
+      key.append(value.hashCode());
+      if (nonNullFields >= numFieldsForKey) {
+        break;
+      }
+    }
+    return UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString();
+  }
+
+  /**
+   * Gets the value of the field at the specified path within the record.
+   * @param record the input record
+   * @param fieldPath the path to the field as an array of integers representing the field position within the object
+   * @return value at the path
+   */
+  private static Object getFieldForRecord(GenericRecord record, int[] fieldPath) {
+    Object value = record;
+    for (Integer index : fieldPath) {
+      if (value == null) {
+        return null;
+      }
+      value = ((GenericRecord) value).get(index);
+    }
+    return value;
+  }
+
+  private int[][] buildFieldOrdering(List<Schema.Field> initialFields) {
+    PriorityQueue<Pair<int[], Integer>> queue = new PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance());
+    Queue<FieldToProcess> fieldsToProcess = new ArrayDeque<>();
+    for (int j = 0; j < initialFields.size(); j++) {
+      fieldsToProcess.offer(new FieldToProcess(new int[]{j}, initialFields.get(j), initialFields.get(j).name()));
+    }
+    while (!fieldsToProcess.isEmpty()) {
+      FieldToProcess fieldToProcess = fieldsToProcess.poll();
+      int[] existingPath = fieldToProcess.getIndexPath();
+      Schema fieldSchema = fieldToProcess.getField().schema();
+      if (fieldSchema.getType() == Schema.Type.UNION) {
+        fieldSchema = fieldSchema.getTypes().get(1);
+      }
+      if (fieldSchema.getType() == Schema.Type.RECORD) {
+        List<Schema.Field> nestedFields = fieldSchema.getFields();
+        for (int i = 0; i < nestedFields.size(); i++) {
+          int[] path = Arrays.copyOf(existingPath, existingPath.length + 1);
+          path[existingPath.length] = i;
+          Schema.Field nestedField = nestedFields.get(i);
+          fieldsToProcess.add(new FieldToProcess(path, nestedField, fieldToProcess.getNamePath() + DOT + nestedField.name()));
+        }
+      } else {
+        // check that field is not used in partitioning
+        if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) {
+          queue.offer(Pair.of(existingPath, getSchemaRanking(fieldToProcess.getField())));
+          if (queue.size() > maxFieldsToConsider) {
+            queue.poll();
+          }
+        }
+      }
+    }
+    Pair<int[], Integer>[] sortedPairs = queue.toArray(new Pair[queue.size()]);
+    Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed());
+    int[][] output = new int[sortedPairs.length][];
+    for (int k = 0; k < sortedPairs.length; k++) {
+      output[k] = sortedPairs[k].getLeft();
+    }
+    return output;
+  }
+
+  private static class FieldToProcess {
+    final int[] indexPath;
+    final Schema.Field field;
+    final String namePath;
+
+    public FieldToProcess(int[] indexPath, Schema.Field field, String namePath) {
+      this.indexPath = indexPath;
+      this.field = field;
+      this.namePath = namePath;
+    }
+
+    public int[] getIndexPath() {
+      return indexPath;
+    }
+
+    public Schema.Field getField() {
+      return field;
+    }
+
+    public String getNamePath() {
+      return namePath;
+    }
+  }
+
+  /**
+   * Ranks the fields by their type.
+   * @param field input field
+   * @return a score of 0 to 4
+   */
+  private int getSchemaRanking(Schema.Field field) {
+    if (field.name().startsWith(HOODIE_PREFIX)) {
+      return 0;
+    }
+    Schema schema = field.schema();
+    if (schema.getType() == Schema.Type.UNION) {
+      schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? schema.getTypes().get(1) : schema.getTypes().get(0);
+    }
+    Schema.Type type = schema.getType();
+    switch (type) {
+      case LONG:
+        // assumes long with logical type will be a timestamp
+        return schema.getLogicalType() != null ? 4 : 3;
+      case INT:
+        // assumes long with logical type will be a date which will have low variance in a batch
+        return schema.getLogicalType() != null ? 1 : 3;
+      case DOUBLE:
+      case FLOAT:
+        return 3;
+      case BOOLEAN:
+      case MAP:
+      case ARRAY:
+        return 1;
+      default:
+        return 2;
+    }
+  }
+
+  private static class RankingComparator implements Comparator<Pair<int[], Integer>> {
+    private static final RankingComparator INSTANCE = new RankingComparator();
+
+    static RankingComparator getInstance() {
+      return INSTANCE;
+    }
+
+    @Override
+    public int compare(Pair<int[], Integer> o1, Pair<int[], Integer> o2) {
+      int initialResult = o1.getRight().compareTo(o2.getRight());
+      if (initialResult == 0) {
+        // favor the smaller list (less nested value) on ties
+        int sizeResult = Integer.compare(o2.getLeft().length, o1.getLeft().length);
+        if (sizeResult == 0) {
+          return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]);
+        }
+        return sizeResult;
+      }
+      return initialResult;
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
index 9ff5c522e45..581ddaa90f5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
 import java.util.Arrays;
 import java.util.stream.Collectors;
@@ -29,22 +30,22 @@ import java.util.stream.Collectors;
  */
 public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
   public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";
+  private final RecordKeyGenerator recordKeyGenerator;
 
   public ComplexAvroKeyGenerator(TypedProperties props) {
     super(props);
-    this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","))
-        .map(String::trim)
-        .filter(s -> !s.isEmpty())
-        .collect(Collectors.toList());
+    this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
     this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()).split(","))
         .map(String::trim)
         .filter(s -> !s.isEmpty())
         .collect(Collectors.toList());
+    this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(),
+        partitionPathFields);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java
new file mode 100644
index 00000000000..cd86b8f0834
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroRecordKeyGenerator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+
+import java.util.List;
+
+/**
+ * Complex record key generator.
+ */
+public class ComplexAvroRecordKeyGenerator implements RecordKeyGenerator {
+
+  private final List<String> recordKeyFields;
+  private final boolean consistentLogicalTimestampEnabled;
+
+  public ComplexAvroRecordKeyGenerator(List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled) {
+    this.recordKeyFields = recordKeyFields;
+    this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return KeyGenUtils.getRecordKey(record, recordKeyFields, consistentLogicalTimestampEnabled);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
index dc0bc3cef2f..ba66b1a32f8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,15 +33,17 @@ import java.util.List;
 public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {
 
   private static final String EMPTY_PARTITION = "";
+  private final RecordKeyGenerator recordKeyGenerator;
 
   public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
     super(config);
     this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
+    this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(), new ArrayList<>());
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
index 5b5cedcbf88..4efbaf9b857 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java
@@ -20,6 +20,7 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,12 +34,14 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
 
   private static final String EMPTY_PARTITION = "";
   private static final List<String> EMPTY_PARTITION_FIELD_LIST = new ArrayList<>();
+  private final RecordKeyGenerator recordKeyGenerator;
 
   public NonpartitionedAvroKeyGenerator(TypedProperties props) {
     super(props);
     this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
         .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
     this.partitionPathFields = EMPTY_PARTITION_FIELD_LIST;
+    this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(), partitionPathFields);
   }
 
   @Override
@@ -56,10 +59,7 @@ public class NonpartitionedAvroKeyGenerator extends BaseKeyGenerator {
     // for backward compatibility, we need to use the right format according to the number of record key fields
     // 1. if there is only one record key field, the format of record key is just "<value>"
     // 2. if there are multiple record key fields, the format is "<field1>:<value1>,<field2>:<value2>,..."
-    if (getRecordKeyFieldNames().size() == 1) {
-      return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
-    }
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames(), isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   public String getEmptyPartition() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
index c7398e94ece..85a3fb74f27 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java
@@ -20,7 +20,9 @@ package org.apache.hudi.keygen;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.keygen.factory.RecordKeyGeneratorFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 
 /**
@@ -28,6 +30,8 @@ import java.util.Collections;
  */
 public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
+  private final RecordKeyGenerator recordKeyGenerator;
+
   public SimpleAvroKeyGenerator(TypedProperties props) {
     this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
@@ -39,15 +43,14 @@ public class SimpleAvroKeyGenerator extends BaseKeyGenerator {
 
   SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
     super(props);
-    this.recordKeyFields = recordKeyField == null
-        ? Collections.emptyList()
-        : Collections.singletonList(recordKeyField);
+    this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).split(","));
     this.partitionPathFields = Collections.singletonList(partitionPathField);
+    this.recordKeyGenerator = RecordKeyGeneratorFactory.getRecordKeyGenerator(config, recordKeyFields, isConsistentLogicalTimestampEnabled(), partitionPathFields);
   }
 
   @Override
   public String getRecordKey(GenericRecord record) {
-    return KeyGenUtils.getRecordKey(record, getRecordKeyFieldNames().get(0), isConsistentLogicalTimestampEnabled());
+    return recordKeyGenerator.getRecordKey(record);
   }
 
   @Override
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java
new file mode 100644
index 00000000000..52bf7ac872b
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroRecordKeyGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Simple record key generator.
+ */
+public class SimpleAvroRecordKeyGenerator implements RecordKeyGenerator {
+
+  private final String recordKeyField;
+  private final boolean consistentLogicalTimestampEnabled;
+
+  public SimpleAvroRecordKeyGenerator(String recordKeyField, boolean consistentLogicalTimestampEnabled) {
+    this.recordKeyField = recordKeyField;
+    this.consistentLogicalTimestampEnabled = consistentLogicalTimestampEnabled;
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return KeyGenUtils.getRecordKey(record, recordKeyField, consistentLogicalTimestampEnabled);
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java
new file mode 100644
index 00000000000..e884d4db5c4
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/RecordKeyGeneratorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen.factory;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.keygen.ComplexAvroRecordKeyGenerator;
+import org.apache.hudi.keygen.AutoRecordKeyGenerator;
+import org.apache.hudi.keygen.RecordKeyGenerator;
+import org.apache.hudi.keygen.SimpleAvroRecordKeyGenerator;
+
+import java.util.List;
+
+/**
+ * Factory to instantiate RecordKeyGenerator.
+ */
+public class RecordKeyGeneratorFactory {
+
+  public static RecordKeyGenerator getRecordKeyGenerator(TypedProperties config, List<String> recordKeyFields, boolean consistentLogicalTimestampEnabled,
+                                                         List<String> partitionPathFields) {
+    if (config.getBoolean(HoodieWriteConfig.AUTO_GENERATE_RECORD_KEYS.key(), HoodieWriteConfig.AUTO_GENERATE_RECORD_KEYS.defaultValue())) {
+      return new AutoRecordKeyGenerator(config, partitionPathFields);
+    } else if (recordKeyFields.size() == 1) {
+      return new SimpleAvroRecordKeyGenerator(recordKeyFields.get(0), consistentLogicalTimestampEnabled);
+    } else {
+      return new ComplexAvroRecordKeyGenerator(recordKeyFields, consistentLogicalTimestampEnabled);
+    }
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
index 977ff709bb1..38ea518508f 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
@@ -30,31 +30,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  * specifically implement record-key, partition-path generation w/o the need for (expensive)
  * conversion from Spark internal representation (for ex, to Avro)
  */
-public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
-
-  /**
-   * Extracts record key from Spark's {@link Row}
-   *
-   * @param row instance of {@link Row} from which record-key is extracted
-   * @return record's (primary) key
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  String getRecordKey(Row row);
-
-  /**
-   * Extracts record key from Spark's {@link InternalRow}
-   *
-   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could
-   *       internally hold just a binary representation of the data, while {@link Row} has it
-   *       deserialized into JVM-native representation (like {@code Integer}, {@code Long},
-   *       {@code String}, etc)
-   *
-   * @param row instance of {@link InternalRow} from which record-key is extracted
-   * @param schema schema {@link InternalRow} is adhering to
-   * @return record-key as instance of {@link UTF8String}
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  UTF8String getRecordKey(InternalRow row, StructType schema);
+public interface SparkKeyGeneratorInterface extends SparkRecordKeyGeneratorInterface {
 
   /**
    * Extracts partition-path from {@link Row}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java
similarity index 61%
copy from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
copy to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java
index 977ff709bb1..3d4ccc0de0e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkRecordKeyGeneratorInterface.java
@@ -20,17 +20,16 @@ package org.apache.hudi.keygen;
 
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIMethod;
+
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
- * Spark-specific {@link KeyGenerator} interface extension allowing implementation to
- * specifically implement record-key, partition-path generation w/o the need for (expensive)
- * conversion from Spark internal representation (for ex, to Avro)
+ * Spark's record key generator interface to assist in generating record key for a given spark row.
  */
-public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
+public interface SparkRecordKeyGeneratorInterface {
 
   /**
    * Extracts record key from Spark's {@link Row}
@@ -55,28 +54,4 @@ public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   UTF8String getRecordKey(InternalRow row, StructType schema);
-
-  /**
-   * Extracts partition-path from {@link Row}
-   *
-   * @param row instance of {@link Row} from which partition-path is extracted
-   * @return record's partition-path
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  String getPartitionPath(Row row);
-
-  /**
-   * Extracts partition-path from Spark's {@link InternalRow}
-   *
-   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could
-   *       internally hold just a binary representation of the data, while {@link Row} has it
-   *       deserialized into JVM-native representation (like {@code Integer}, {@code Long},
-   *       {@code String}, etc)
-   *
-   * @param row instance of {@link InternalRow} from which record-key is extracted
-   * @param schema schema {@link InternalRow} is adhering to
-   * @return partition-path as instance of {@link UTF8String}
-   */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  UTF8String getPartitionPath(InternalRow row, StructType schema);
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
index d0baa903919..e3d5a3b18bb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java
@@ -30,7 +30,7 @@ import java.util.List;
  * Base abstract class to extend for {@link KeyGenerator} with default logic of taking
  * partitioning and timestamp configs.
  */
-public abstract class BaseKeyGenerator extends KeyGenerator {
+public abstract class BaseKeyGenerator extends KeyGenerator implements RecordKeyGenerator {
 
   protected List<String> recordKeyFields;
   protected List<String> partitionPathFields;
@@ -51,7 +51,7 @@ public abstract class BaseKeyGenerator extends KeyGenerator {
   /**
    * Generate a record Key out of provided generic record.
    */
-  public abstract String getRecordKey(GenericRecord record);
+  //public abstract String getRecordKey(GenericRecord record);
 
   /**
    * Generate a partition path out of provided generic record.
diff --git a/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java b/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java
new file mode 100644
index 00000000000..2854c5de2d8
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/keygen/RecordKeyGenerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+
+/**
+ * Interface to fetch record key given a GenericRecord.
+ */
+public interface RecordKeyGenerator {
+
+  /**
+   * Generate a record Key out of provided generic record.
+   */
+  String getRecordKey(GenericRecord record);
+}