You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2022/05/16 03:07:10 UTC

[hudi] branch master updated: [HUDI-3123] consistent hashing index: basic write path (upsert/insert) (#4480)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 61030d8e7a [HUDI-3123] consistent hashing index: basic write path (upsert/insert) (#4480)
61030d8e7a is described below

commit 61030d8e7a5a05e215efed672267ac163b0cbcf6
Author: Yuwei XIAO <yw...@gmail.com>
AuthorDate: Mon May 16 11:07:01 2022 +0800

    [HUDI-3123] consistent hashing index: basic write path (upsert/insert) (#4480)
    
     1. basic write path(insert/upsert) implementation
     2. adapt simple bucket index
---
 .../hudi/client/utils/LazyIterableIterator.java    |   4 +-
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  46 +++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../java/org/apache/hudi/index/HoodieIndex.java    |   6 +-
 .../apache/hudi/index/bucket/BucketIdentifier.java |  49 ++--
 .../index/bucket/BucketIndexLocationMapper.java    |  35 +++
 .../index/bucket/ConsistentBucketIdentifier.java   | 104 +++++++++
 .../hudi/index/bucket/HoodieBucketIndex.java       | 119 ++++------
 .../hudi/index/bucket/HoodieSimpleBucketIndex.java |  99 ++++++++
 .../org/apache/hudi/io/WriteHandleFactory.java     |   3 +-
 .../action/commit/BaseCommitActionExecutor.java    |   2 +-
 ...yout.java => HoodieConsistentBucketLayout.java} |  44 ++--
 .../hudi/table/storage/HoodieDefaultLayout.java    |   7 +-
 .../hudi/table/storage/HoodieLayoutFactory.java    |   9 +-
 ...etLayout.java => HoodieSimpleBucketLayout.java} |  32 +--
 .../hudi/table/storage/HoodieStorageLayout.java    |   2 +-
 .../hudi/index/bucket/TestBucketIdentifier.java    | 122 ++++++++++
 .../bucket/TestConsistentBucketIdIdentifier.java   |  79 +++++++
 .../apache/hudi/client/SparkRDDWriteClient.java    |   2 +-
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |   5 +
 .../apache/hudi/index/SparkHoodieIndexFactory.java |  16 +-
 .../bucket/HoodieSparkConsistentBucketIndex.java   | 210 +++++++++++++++++
 .../functional/TestConsistentBucketIndex.java      | 250 +++++++++++++++++++++
 .../hudi/client/functional/TestHoodieIndex.java    |   3 +
 .../apache/hudi/index/TestHoodieIndexConfigs.java  |  14 +-
 ...Index.java => TestHoodieSimpleBucketIndex.java} |  17 +-
 .../commit/TestCopyOnWriteActionExecutor.java      |   5 +-
 .../org/apache/hudi/common/data/HoodieData.java    |   9 +
 .../org/apache/hudi/common/data/HoodieList.java    |   5 +
 .../java/org/apache/hudi/common/fs/FSUtils.java    |   4 +
 .../hudi/common/model/ConsistentHashingNode.java   |  78 +++++++
 .../hudi/common/model/HoodieCommitMetadata.java    |  23 +-
 .../model/HoodieConsistentHashingMetadata.java     | 142 ++++++++++++
 .../common/model/HoodieReplaceCommitMetadata.java  |  17 +-
 .../common/model/HoodieRollingStatMetadata.java    |   4 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |   8 +
 .../org/apache/hudi/common/util/JsonUtils.java     |  38 ++++
 .../org/apache/hudi/common/util/hash/HashID.java   |   9 +
 .../model/TestHoodieConsistentHashingMetadata.java |  25 +--
 .../common/testutils/HoodieCommonTestHarness.java  |   4 +
 .../hudi/index/bucket/TestBucketIdentifier.java    |  67 ------
 41 files changed, 1444 insertions(+), 277 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java
index 020944e7ab..ad54f8c0a0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java
@@ -45,7 +45,7 @@ public abstract class LazyIterableIterator<I, O> implements Iterable<O>, Iterato
   /**
    * Called once, before any elements are processed.
    */
-  protected abstract void start();
+  protected void start() {}
 
   /**
    * Block computation to be overwritten by sub classes.
@@ -55,7 +55,7 @@ public abstract class LazyIterableIterator<I, O> implements Iterable<O>, Iterato
   /**
    * Called once, after all elements are processed.
    */
-  protected abstract void end();
+  protected void end() {}
 
   //////////////////
   // iterable implementation
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 7c1f7e00e7..dbd45b9738 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -216,19 +216,40 @@ public class HoodieIndexConfig extends HoodieConfig {
   /**
    * ***** Bucket Index Configs *****
    * Bucket Index is targeted to locate the record fast by hash in big data scenarios.
-   * The current implementation is a basic version, so there are some constraints:
-   * 1. Unsupported operation: bulk insert, cluster and so on.
-   * 2. Bucket num change requires rewriting the partition.
-   * 3. Predict the table size and future data growth well to set a reasonable bucket num.
-   * 4. A bucket size is recommended less than 3GB and avoid bing too small.
-   * more details and progress see [HUDI-3039].
-   */
-  // Bucket num equals file groups num in each partition.
-  // Bucket num can be set according to partition size and file group size.
+   * A bucket size is recommended less than 3GB to avoid being too small.
+   * For more details and progress, see [HUDI-3039].
+   */
+
+  /**
+   * Bucket Index Engine Type: implementation of bucket index
+   *
+   * SIMPLE:
+   *  0. Check `HoodieSimpleBucketLayout` for its supported operations.
+   *  1. Bucket num is fixed and requires rewriting the partition if we want to change it.
+   *
+   * CONSISTENT_HASHING:
+   *  0. Check `HoodieConsistentBucketLayout` for its supported operations.
+   *  1. Bucket num will auto-adjust by running clustering (still in progress)
+   */
+  public static final ConfigProperty<String> BUCKET_INDEX_ENGINE_TYPE = ConfigProperty
+      .key("hoodie.index.bucket.engine")
+      .defaultValue("SIMPLE")
+      .sinceVersion("0.11.0")
+      .withDocumentation("Type of bucket index engine to use. Default is SIMPLE bucket index, with fixed number of bucket."
+          + "Possible options are [SIMPLE | CONSISTENT_HASHING]."
+          + "Consistent hashing supports dynamic resizing of the number of bucket, solving potential data skew and file size "
+          + "issues of the SIMPLE hashing engine.");
+
+  /**
+   * Bucket num equals file groups num in each partition.
+   * Bucket num can be set according to partition size and file group size.
+   *
+   * In dynamic bucket index cases (e.g., using CONSISTENT_HASHING), this config of number of bucket serves as a initial bucket size
+   */
   public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigProperty
       .key("hoodie.bucket.index.num.buckets")
       .defaultValue(256)
-      .withDocumentation("Only applies if index type is BUCKET_INDEX. Determine the number of buckets in the hudi table, "
+      .withDocumentation("Only applies if index type is BUCKET. Determine the number of buckets in the hudi table, "
           + "and each partition is divided to N buckets.");
 
   public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty
@@ -463,6 +484,11 @@ public class HoodieIndexConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType bucketType) {
+      hoodieIndexConfig.setValue(BUCKET_INDEX_ENGINE_TYPE, bucketType.name());
+      return this;
+    }
+
     public Builder withIndexClass(String indexClass) {
       hoodieIndexConfig.setValue(INDEX_CLASS_NAME, indexClass);
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 322c2e84e7..3eeb99044b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1428,6 +1428,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodieIndexConfig.INDEX_CLASS_NAME);
   }
 
+  public HoodieIndex.BucketIndexEngineType getBucketIndexEngineType() {
+    return HoodieIndex.BucketIndexEngineType.valueOf(getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE));
+  }
+
   public int getBloomFilterNumEntries() {
     return getInt(HoodieIndexConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE);
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
index 922371c4a0..1182c45c72 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
@@ -121,7 +121,7 @@ public abstract class HoodieIndex<I, O> implements Serializable {
   public abstract boolean isImplicitWithStorage();
 
   /**
-   * If the `getCustomizedPartitioner` returns a partitioner, it has to be true.
+   * To indicate if a operation type requires location tagging before writing
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public boolean requiresTagging(WriteOperationType operationType) {
@@ -143,4 +143,8 @@ public abstract class HoodieIndex<I, O> implements Serializable {
   public enum IndexType {
     HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, FLINK_STATE
   }
+
+  public enum BucketIndexEngineType {
+    SIMPLE, CONSISTENT_HASHING
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 1a07c4063f..1f233b4297 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -29,8 +30,8 @@ import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-public class BucketIdentifier {
-  // compatible with the spark bucket name
+public class BucketIdentifier implements Serializable {
+  // Compatible with the spark bucket name
   private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$");
 
   public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets) {
@@ -38,27 +39,41 @@ public class BucketIdentifier {
   }
 
   public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
-    return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets);
+    return (getHashKeys(hoodieKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets;
+  }
+
+  public static int getBucketId(HoodieKey hoodieKey, List<String> indexKeyFields, int numBuckets) {
+    return (getHashKeys(hoodieKey.getRecordKey(), indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets;
   }
 
   public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) {
-    List<String> hashKeyFields;
-    if (!recordKey.contains(":")) {
-      hashKeyFields = Collections.singletonList(recordKey);
-    } else {
-      Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
-          .map(p -> p.split(":"))
-          .collect(Collectors.toMap(p -> p[0], p -> p[1]));
-      hashKeyFields = Arrays.stream(indexKeyFields.split(","))
-          .map(f -> recordKeyPairs.get(f))
-          .collect(Collectors.toList());
-    }
-    return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets;
+    return getBucketId(getHashKeys(recordKey, indexKeyFields), numBuckets);
   }
 
-  // only for test
   public  static int getBucketId(List<String> hashKeyFields, int numBuckets) {
-    return hashKeyFields.hashCode() % numBuckets;
+    return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets;
+  }
+
+  public static List<String> getHashKeys(HoodieKey hoodieKey, String indexKeyFields) {
+    return getHashKeys(hoodieKey.getRecordKey(), indexKeyFields);
+  }
+
+  protected static List<String> getHashKeys(String recordKey, String indexKeyFields) {
+    return !recordKey.contains(":") ? Collections.singletonList(recordKey) :
+        getHashKeysUsingIndexFields(recordKey, Arrays.asList(indexKeyFields.split(",")));
+  }
+
+  protected static List<String> getHashKeys(String recordKey, List<String> indexKeyFields) {
+    return !recordKey.contains(":") ? Collections.singletonList(recordKey) :
+        getHashKeysUsingIndexFields(recordKey, indexKeyFields);
+  }
+
+  private static List<String> getHashKeysUsingIndexFields(String recordKey, List<String> indexKeyFields) {
+    Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
+        .map(p -> p.split(":"))
+        .collect(Collectors.toMap(p -> p[0], p -> p[1]));
+    return indexKeyFields.stream()
+        .map(f -> recordKeyPairs.get(f)).collect(Collectors.toList());
   }
 
   public static String partitionBucketIdStr(String partition, int bucketId) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
new file mode 100644
index 0000000000..4955087333
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+
+import java.io.Serializable;
+
+public interface BucketIndexLocationMapper extends Serializable {
+
+  /**
+   * Get record location given hoodie key and partition path
+   */
+  Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath);
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
new file mode 100644
index 0000000000..c44a8a6ccf
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * In-memory structure to speed up ring mapping (hashing value -> hashing node)
+   */
+  private final TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * Mapping from fileId -> hashing node
+   */
+  private final Map<String, ConsistentHashingNode> fileIdToBucket;
+
+  public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) {
+    this.metadata = metadata;
+    this.fileIdToBucket = new HashMap<>();
+    this.ring = new TreeMap<>();
+    initialize();
+  }
+
+  public Collection<ConsistentHashingNode> getNodes() {
+    return ring.values();
+  }
+
+  public HoodieConsistentHashingMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getNumBuckets() {
+    return ring.size();
+  }
+
+  /**
+   * Get bucket of the given file group
+   *
+   * @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
+   */
+  public ConsistentHashingNode getBucketByFileId(String fileId) {
+    return fileIdToBucket.get(fileId);
+  }
+
+  public ConsistentHashingNode getBucket(HoodieKey hoodieKey, List<String> indexKeyFields) {
+    return getBucket(getHashKeys(hoodieKey.getRecordKey(), indexKeyFields));
+  }
+
+  protected ConsistentHashingNode getBucket(List<String> hashKeys) {
+    int hashValue = HashID.getXXHash32(String.join("", hashKeys), 0);
+    return getBucket(hashValue & HoodieConsistentHashingMetadata.HASH_VALUE_MASK);
+  }
+
+  protected ConsistentHashingNode getBucket(int hashValue) {
+    SortedMap<Integer, ConsistentHashingNode> tailMap = ring.tailMap(hashValue);
+    return tailMap.isEmpty() ? ring.firstEntry().getValue() : tailMap.get(tailMap.firstKey());
+  }
+
+  /**
+   * Initialize necessary data structure to facilitate bucket identifying.
+   * Specifically, we construct:
+   * - An in-memory tree (ring) to speed up range mapping searching.
+   * - A hash table (fileIdToBucket) to allow lookup of bucket using fileId.
+   */
+  private void initialize() {
+    for (ConsistentHashingNode p : metadata.getNodes()) {
+      ring.put(p.getValue(), p);
+      // One bucket has only one file group, so append 0 directly
+      fileIdToBucket.put(FSUtils.createNewFileId(p.getFileIdPrefix(), 0), p);
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
index a243eea767..c3584d234a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java
@@ -26,9 +26,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
@@ -37,28 +35,31 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.List;
 
 /**
  * Hash indexing mechanism.
  */
-public class HoodieBucketIndex extends HoodieIndex<Object, Object> {
+public abstract class HoodieBucketIndex extends HoodieIndex<Object, Object> {
 
-  private static final Logger LOG =  LogManager.getLogger(HoodieBucketIndex.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class);
 
-  private final int numBuckets;
+  protected final int numBuckets;
+  protected final List<String> indexKeyFields;
 
   public HoodieBucketIndex(HoodieWriteConfig config) {
     super(config);
-    numBuckets = config.getBucketIndexNumBuckets();
-    LOG.info("use bucket index, numBuckets=" + numBuckets);
+
+    this.numBuckets = config.getBucketIndexNumBuckets();
+    this.indexKeyFields = Arrays.asList(config.getBucketIndexHashField().split(","));
+    LOG.info("Use bucket index, numBuckets = " + numBuckets + ", indexFields: " + indexKeyFields);
   }
 
   @Override
   public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses,
-      HoodieEngineContext context,
-      HoodieTable hoodieTable)
+                                                HoodieEngineContext context,
+                                                HoodieTable hoodieTable)
       throws HoodieIndexException {
     return writeStatuses;
   }
@@ -68,62 +69,35 @@ public class HoodieBucketIndex extends HoodieIndex<Object, Object> {
       HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
       HoodieTable hoodieTable)
       throws HoodieIndexException {
-    HoodieData<HoodieRecord<R>> taggedRecords = records.mapPartitions(recordIter -> {
-      // partitionPath -> bucketId -> fileInfo
-      Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList = new HashMap<>();
-      return new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(recordIter) {
-
-        @Override
-        protected void start() {
-
-        }
-
-        @Override
-        protected HoodieRecord<R> computeNext() {
-          HoodieRecord record = recordIter.next();
-          int bucketId = BucketIdentifier.getBucketId(record, config.getBucketIndexHashField(), numBuckets);
-          String partitionPath = record.getPartitionPath();
-          if (!partitionPathFileIDList.containsKey(partitionPath)) {
-            partitionPathFileIDList.put(partitionPath, loadPartitionBucketIdFileIdMapping(hoodieTable, partitionPath));
-          }
-          if (partitionPathFileIDList.get(partitionPath).containsKey(bucketId)) {
-            Pair<String, String> fileInfo = partitionPathFileIDList.get(partitionPath).get(bucketId);
-            return HoodieIndexUtils.getTaggedRecord(record, Option.of(
-                new HoodieRecordLocation(fileInfo.getRight(), fileInfo.getLeft())
-            ));
+    // Initialize necessary information before tagging. e.g., hashing metadata
+    List<String> partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
+    LOG.info("Initializing hashing metadata for partitions: " + partitions);
+    BucketIndexLocationMapper mapper = getLocationMapper(hoodieTable, partitions);
+
+    return records.mapPartitions(iterator ->
+        new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
+          @Override
+          protected HoodieRecord<R> computeNext() {
+            // TODO maybe batch the operation to improve performance
+            HoodieRecord record = inputItr.next();
+            Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey(), record.getPartitionPath());
+            return HoodieIndexUtils.getTaggedRecord(record, loc);
           }
-          return record;
-        }
-
-        @Override
-        protected void end() {
-
         }
-      };
-    }, true);
-    return taggedRecords;
+    );
   }
 
-  private Map<Integer, Pair<String, String>> loadPartitionBucketIdFileIdMapping(
-      HoodieTable hoodieTable,
-      String partition) {
-    // bucketId -> fileIds
-    Map<Integer, Pair<String, String>> fileIDList = new HashMap<>();
-    HoodieIndexUtils
-        .getLatestBaseFilesForPartition(partition, hoodieTable)
-        .forEach(file -> {
-          String fileId = file.getFileId();
-          String commitTime = file.getCommitTime();
-          int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
-          if (!fileIDList.containsKey(bucketId)) {
-            fileIDList.put(bucketId, Pair.of(fileId, commitTime));
-          } else {
-            // check if bucket data is valid
-            throw new HoodieIOException("Find multiple files at partition path="
-                + partition + " belongs to the same bucket id = " + bucketId);
-          }
-        });
-    return fileIDList;
+  @Override
+  public boolean requiresTagging(WriteOperationType operationType) {
+    switch (operationType) {
+      case INSERT:
+      case INSERT_OVERWRITE:
+      case UPSERT:
+      case DELETE:
+        return true;
+      default:
+        return false;
+    }
   }
 
   @Override
@@ -138,7 +112,7 @@ public class HoodieBucketIndex extends HoodieIndex<Object, Object> {
 
   @Override
   public boolean canIndexLogFiles() {
-    return false;
+    return true;
   }
 
   @Override
@@ -146,19 +120,12 @@ public class HoodieBucketIndex extends HoodieIndex<Object, Object> {
     return true;
   }
 
-  @Override
-  public boolean requiresTagging(WriteOperationType operationType) {
-    switch (operationType) {
-      case INSERT:
-      case INSERT_OVERWRITE:
-      case UPSERT:
-        return true;
-      default:
-        return false;
-    }
-  }
-
   public int getNumBuckets() {
     return numBuckets;
   }
+
+  /**
+   * Get a location mapper for the given table & partitionPath
+   */
+  protected abstract BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath);
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
new file mode 100644
index 0000000000..92ac4f69b2
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Simple bucket index implementation, with fixed bucket number.
+ */
+public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class);
+
+  public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  private Map<Integer, HoodieRecordLocation> loadPartitionBucketIdFileIdMapping(
+      HoodieTable hoodieTable,
+      String partition) {
+    // bucketId -> fileIds
+    Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new HashMap<>();
+    hoodieTable.getMetaClient().reloadActiveTimeline();
+    HoodieIndexUtils
+        .getLatestBaseFilesForPartition(partition, hoodieTable)
+        .forEach(file -> {
+          String fileId = file.getFileId();
+          String commitTime = file.getCommitTime();
+          int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
+          if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
+            bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId));
+          } else {
+            // Check if bucket data is valid
+            throw new HoodieIOException("Find multiple files at partition path="
+                + partition + " belongs to the same bucket id = " + bucketId);
+          }
+        });
+    return bucketIdToFileIdMapping;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath) {
+    return new SimpleBucketIndexLocationMapper(table, partitionPath);
+  }
+
+  public class SimpleBucketIndexLocationMapper implements BucketIndexLocationMapper {
+
+    /**
+     * Mapping from partitionPath -> bucketId -> fileInfo
+     */
+    private final Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList;
+
+    public SimpleBucketIndexLocationMapper(HoodieTable table, List<String> partitions) {
+      partitionPathFileIDList = partitions.stream().collect(Collectors.toMap(p -> p, p -> loadPartitionBucketIdFileIdMapping(table, p)));
+    }
+
+    @Override
+    public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath) {
+      int bucketId = BucketIdentifier.getBucketId(key, indexKeyFields, numBuckets);
+      Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = partitionPathFileIDList.get(partitionPath);
+      return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId, null));
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
index 36fae304d7..c267b5969d 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/WriteHandleFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
@@ -32,6 +33,6 @@ public abstract class WriteHandleFactory<T extends HoodieRecordPayload, I, K, O>
       String partitionPath, String fileIdPrefix, TaskContextSupplier taskContextSupplier);
 
   protected String getNextFileId(String idPfx) {
-    return String.format("%s-%d", idPfx, numFilesWritten++);
+    return FSUtils.createNewFileId(idPfx, numFilesWritten++);
   }
 }
\ No newline at end of file
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index fb07d35928..31c8bbd6d3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -94,7 +94,7 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I,
     this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
     this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
     this.pendingInflightAndRequestedInstants.remove(instantTime);
-    if (table.getStorageLayout().doesNotSupport(operationType)) {
+    if (!table.getStorageLayout().writeOperationSupported(operationType)) {
       throw new UnsupportedOperationException("Executor " + this.getClass().getSimpleName()
           + " is not compatible with table layout " + table.getStorageLayout().getClass().getSimpleName());
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
similarity index 56%
copy from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java
copy to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
index deefcfe6a6..0ed2b9c939 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieConsistentBucketLayout.java
@@ -19,31 +19,28 @@
 package org.apache.hudi.table.storage;
 
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieLayoutConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
-import java.util.HashSet;
 import java.util.Set;
 
 /**
- * Storage layout when using bucket index. Data distribution and files organization are in a specific way.
+ * Storage layout when using consistent hashing bucket index.
  */
-public class HoodieBucketLayout extends HoodieStorageLayout {
+public class HoodieConsistentBucketLayout extends HoodieStorageLayout {
+  public static final Set<WriteOperationType> SUPPORTED_OPERATIONS = CollectionUtils.createImmutableSet(
+      WriteOperationType.INSERT,
+      WriteOperationType.INSERT_PREPPED,
+      WriteOperationType.UPSERT,
+      WriteOperationType.UPSERT_PREPPED,
+      WriteOperationType.INSERT_OVERWRITE,
+      WriteOperationType.DELETE,
+      WriteOperationType.COMPACT,
+      WriteOperationType.DELETE_PARTITION
+  );
 
-  public static final Set<WriteOperationType> SUPPORTED_OPERATIONS = new HashSet<WriteOperationType>() {{
-      add(WriteOperationType.INSERT);
-      add(WriteOperationType.INSERT_PREPPED);
-      add(WriteOperationType.UPSERT);
-      add(WriteOperationType.UPSERT_PREPPED);
-      add(WriteOperationType.INSERT_OVERWRITE);
-      add(WriteOperationType.DELETE);
-      add(WriteOperationType.COMPACT);
-      add(WriteOperationType.DELETE_PARTITION);
-    }
-  };
-
-  public HoodieBucketLayout(HoodieWriteConfig config) {
+  public HoodieConsistentBucketLayout(HoodieWriteConfig config) {
     super(config);
   }
 
@@ -55,14 +52,17 @@ public class HoodieBucketLayout extends HoodieStorageLayout {
     return true;
   }
 
+  /**
+   * Consistent hashing will tag all incoming records, so we could go ahead reusing an existing Partitioner
+   */
+  @Override
   public Option<String> layoutPartitionerClass() {
-    return config.contains(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME)
-        ? Option.of(config.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key()))
-        : Option.empty();
+    return Option.empty();
   }
 
   @Override
-  public boolean doesNotSupport(WriteOperationType operationType) {
-    return !SUPPORTED_OPERATIONS.contains(operationType);
+  public boolean writeOperationSupported(WriteOperationType operationType) {
+    return SUPPORTED_OPERATIONS.contains(operationType);
   }
+
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java
index 09d20707a4..28fe37c9b8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieDefaultLayout.java
@@ -31,15 +31,18 @@ public class HoodieDefaultLayout extends HoodieStorageLayout {
     super(config);
   }
 
+  @Override
   public boolean determinesNumFileGroups() {
     return false;
   }
 
+  @Override
   public Option<String> layoutPartitionerClass() {
     return Option.empty();
   }
 
-  public boolean doesNotSupport(WriteOperationType operationType) {
-    return false;
+  @Override
+  public boolean writeOperationSupported(WriteOperationType operationType) {
+    return true;
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java
index e86d253df4..e78c15b3a4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java
@@ -30,7 +30,14 @@ public final class HoodieLayoutFactory {
       case DEFAULT:
         return new HoodieDefaultLayout(config);
       case BUCKET:
-        return new HoodieBucketLayout(config);
+        switch (config.getBucketIndexEngineType()) {
+          case SIMPLE:
+            return new HoodieSimpleBucketLayout(config);
+          case CONSISTENT_HASHING:
+            return new HoodieConsistentBucketLayout(config);
+          default:
+            throw new HoodieNotSupportedException("Unknown bucket index engine type: " + config.getBucketIndexEngineType());
+        }
       default:
         throw new HoodieNotSupportedException("Unknown layout type, set " + config.getLayoutType());
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
similarity index 71%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
index deefcfe6a6..be048a23b0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieBucketLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieSimpleBucketLayout.java
@@ -19,31 +19,30 @@
 package org.apache.hudi.table.storage;
 
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieLayoutConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
-import java.util.HashSet;
 import java.util.Set;
 
 /**
  * Storage layout when using bucket index. Data distribution and files organization are in a specific way.
  */
-public class HoodieBucketLayout extends HoodieStorageLayout {
+public class HoodieSimpleBucketLayout extends HoodieStorageLayout {
 
-  public static final Set<WriteOperationType> SUPPORTED_OPERATIONS = new HashSet<WriteOperationType>() {{
-      add(WriteOperationType.INSERT);
-      add(WriteOperationType.INSERT_PREPPED);
-      add(WriteOperationType.UPSERT);
-      add(WriteOperationType.UPSERT_PREPPED);
-      add(WriteOperationType.INSERT_OVERWRITE);
-      add(WriteOperationType.DELETE);
-      add(WriteOperationType.COMPACT);
-      add(WriteOperationType.DELETE_PARTITION);
-    }
-  };
+  public static final Set<WriteOperationType> SUPPORTED_OPERATIONS = CollectionUtils.createImmutableSet(
+      WriteOperationType.INSERT,
+      WriteOperationType.INSERT_PREPPED,
+      WriteOperationType.UPSERT,
+      WriteOperationType.UPSERT_PREPPED,
+      WriteOperationType.INSERT_OVERWRITE,
+      WriteOperationType.DELETE,
+      WriteOperationType.COMPACT,
+      WriteOperationType.DELETE_PARTITION
+  );
 
-  public HoodieBucketLayout(HoodieWriteConfig config) {
+  public HoodieSimpleBucketLayout(HoodieWriteConfig config) {
     super(config);
   }
 
@@ -55,6 +54,7 @@ public class HoodieBucketLayout extends HoodieStorageLayout {
     return true;
   }
 
+  @Override
   public Option<String> layoutPartitionerClass() {
     return config.contains(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME)
         ? Option.of(config.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME.key()))
@@ -62,7 +62,7 @@ public class HoodieBucketLayout extends HoodieStorageLayout {
   }
 
   @Override
-  public boolean doesNotSupport(WriteOperationType operationType) {
-    return !SUPPORTED_OPERATIONS.contains(operationType);
+  public boolean writeOperationSupported(WriteOperationType operationType) {
+    return SUPPORTED_OPERATIONS.contains(operationType);
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java
index a0a4eab463..36be1a8bef 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieStorageLayout.java
@@ -48,7 +48,7 @@ public abstract class HoodieStorageLayout implements Serializable {
   /**
    * Determines if the operation is supported by the layout.
    */
-  public abstract boolean doesNotSupport(WriteOperationType operationType);
+  public abstract boolean writeOperationSupported(WriteOperationType operationType);
 
   public enum LayoutType {
     DEFAULT, BUCKET
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
new file mode 100644
index 0000000000..31f33890ad
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.keygen.KeyGenUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class TestBucketIdentifier {
+
+  public static final String NESTED_COL_SCHEMA = "{\"type\":\"record\", \"name\":\"nested_col\",\"fields\": ["
+      + "{\"name\": \"prop1\",\"type\": \"string\"},{\"name\": \"prop2\", \"type\": \"long\"}]}";
+  public static final String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
+      + "{\"name\": \"timestamp\",\"type\": \"long\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+      + "{\"name\": \"ts_ms\", \"type\": \"string\"},"
+      + "{\"name\": \"pii_col\", \"type\": \"string\"},"
+      + "{\"name\": \"nested_col\",\"type\": "
+      + NESTED_COL_SCHEMA + "}"
+      + "]}";
+
+  public static GenericRecord getRecord() {
+    return getRecord(getNestedColRecord("val1", 10L));
+  }
+
+  public static GenericRecord getNestedColRecord(String prop1Value, Long prop2Value) {
+    GenericRecord nestedColRecord = new GenericData.Record(new Schema.Parser().parse(NESTED_COL_SCHEMA));
+    nestedColRecord.put("prop1", prop1Value);
+    nestedColRecord.put("prop2", prop2Value);
+    return nestedColRecord;
+  }
+
+  public static GenericRecord getRecord(GenericRecord nestedColRecord) {
+    GenericRecord record = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
+    record.put("timestamp", 4357686L);
+    record.put("_row_key", "key1");
+    record.put("ts_ms", "2020-03-21");
+    record.put("pii_col", "pi");
+    record.put("nested_col", nestedColRecord);
+    return record;
+  }
+
+  @Test
+  public void testBucketFileId() {
+    int[] ids = {0, 4, 8, 16, 32, 64, 128, 256, 512, 1000, 1024, 4096, 10000, 100000};
+    for (int id : ids) {
+      String bucketIdStr = BucketIdentifier.bucketIdStr(id);
+      String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketIdStr);
+      assert BucketIdentifier.bucketIdFromFileId(fileId) == id;
+    }
+  }
+
+  @Test
+  public void testBucketIdWithSimpleRecordKey() {
+    String recordKeyField = "_row_key";
+    String indexKeyField = "_row_key";
+    GenericRecord record = getRecord();
+    HoodieRecord hoodieRecord = new HoodieAvroRecord(
+        new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
+    int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
+    assert bucketId == BucketIdentifier.getBucketId(
+        Arrays.asList(record.get(indexKeyField).toString()), 8);
+  }
+
+  @Test
+  public void testBucketIdWithComplexRecordKey() {
+    List<String> recordKeyField = Arrays.asList("_row_key", "ts_ms");
+    String indexKeyField = "_row_key";
+    GenericRecord record = getRecord();
+    HoodieRecord hoodieRecord = new HoodieAvroRecord(
+        new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
+    int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
+    assert bucketId == BucketIdentifier.getBucketId(
+        Arrays.asList(record.get(indexKeyField).toString()), 8);
+  }
+
+  @Test
+  public void testGetHashKeys() {
+    BucketIdentifier identifier = new BucketIdentifier();
+    List<String> keys = identifier.getHashKeys(new HoodieKey("abc", "partition"), "");
+    Assertions.assertEquals(1, keys.size());
+    Assertions.assertEquals("abc", keys.get(0));
+
+    keys = identifier.getHashKeys(new HoodieKey("f1:abc", "partition"), "f1");
+    Assertions.assertEquals(1, keys.size());
+    Assertions.assertEquals("abc", keys.get(0));
+
+    keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), "f2");
+    Assertions.assertEquals(1, keys.size());
+    Assertions.assertEquals("bcd", keys.get(0));
+
+    keys = identifier.getHashKeys(new HoodieKey("f1:abc,f2:bcd", "partition"), "f1,f2");
+    Assertions.assertEquals(2, keys.size());
+    Assertions.assertEquals("abc", keys.get(0));
+    Assertions.assertEquals("bcd", keys.get(1));
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java
new file mode 100644
index 0000000000..3ffe6ded18
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASH_VALUE_MASK;
+
+/**
+ * Unit test of consistent bucket identifier
+ */
+public class TestConsistentBucketIdIdentifier {
+
+  @Test
+  public void testGetBucket() {
+    List<ConsistentHashingNode> nodes = Arrays.asList(
+        new ConsistentHashingNode(100, "0"),
+        new ConsistentHashingNode(0x2fffffff, "1"),
+        new ConsistentHashingNode(0x4fffffff, "2"));
+    HoodieConsistentHashingMetadata meta = new HoodieConsistentHashingMetadata((short) 0, "", "", 3, 0, nodes);
+    ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(meta);
+
+    Assertions.assertEquals(3, identifier.getNumBuckets());
+
+    // Get bucket by hash keys
+    Assertions.assertEquals(nodes.get(2), identifier.getBucket(Arrays.asList("Hudi")));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("bucket_index")));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("consistent_hashing")));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("bucket_index", "consistent_hashing")));
+    int[] ref1 = {2, 2, 1, 1, 0, 1, 1, 1, 0, 1};
+    int[] ref2 = {1, 0, 1, 0, 1, 1, 1, 0, 1, 2};
+    for (int i = 0; i < 10; ++i) {
+      Assertions.assertEquals(nodes.get(ref1[i]), identifier.getBucket(Arrays.asList(Integer.toString(i))));
+      Assertions.assertEquals(nodes.get(ref2[i]), identifier.getBucket(Arrays.asList(Integer.toString(i), Integer.toString(i + 1))));
+    }
+
+    // Get bucket by hash value
+    Assertions.assertEquals(nodes.get(0), identifier.getBucket(0));
+    Assertions.assertEquals(nodes.get(0), identifier.getBucket(50));
+    Assertions.assertEquals(nodes.get(0), identifier.getBucket(100));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(101));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(0x1fffffff));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(0x2fffffff));
+    Assertions.assertEquals(nodes.get(2), identifier.getBucket(0x40000000));
+    Assertions.assertEquals(nodes.get(2), identifier.getBucket(0x40000001));
+    Assertions.assertEquals(nodes.get(2), identifier.getBucket(0x4fffffff));
+    Assertions.assertEquals(nodes.get(0), identifier.getBucket(0x50000000));
+    Assertions.assertEquals(nodes.get(0), identifier.getBucket(HASH_VALUE_MASK));
+
+    // Get bucket by file id
+    Assertions.assertEquals(nodes.get(0), identifier.getBucketByFileId(FSUtils.createNewFileId("0", 0)));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucketByFileId(FSUtils.createNewFileId("1", 0)));
+    Assertions.assertEquals(nodes.get(2), identifier.getBucketByFileId(FSUtils.createNewFileId("2", 0)));
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 3b512f0bdc..df82e75db9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -179,7 +179,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
         initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
     table.validateInsertSchema();
     preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
-    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insert(context,instantTime, HoodieJavaRDD.of(records));
+    HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insert(context, instantTime, HoodieJavaRDD.of(records));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return postWrite(resultRDD, instantTime, table);
   }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 66edf607f8..0843dfc3c9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -112,6 +112,11 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
     return HoodieJavaRDD.of(rddData.mapPartitions(func::apply, preservesPartitioning));
   }
 
+  @Override
+  public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, Iterator<O>> func) {
+    return HoodieJavaRDD.of(rddData.mapPartitions(func::apply));
+  }
+
   @Override
   public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
     return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index d1f40dca48..4525490c8d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -28,7 +28,8 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.bloom.HoodieBloomIndex;
 import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
 import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper;
-import org.apache.hudi.index.bucket.HoodieBucketIndex;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
 import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
 import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
@@ -56,8 +57,6 @@ public final class SparkHoodieIndexFactory {
         return new SparkHoodieHBaseIndex(config);
       case INMEMORY:
         return new HoodieInMemoryHashIndex(config);
-      case BUCKET:
-        return new HoodieBucketIndex(config);
       case BLOOM:
         return new HoodieBloomIndex(config, SparkHoodieBloomIndexHelper.getInstance());
       case GLOBAL_BLOOM:
@@ -66,6 +65,15 @@ public final class SparkHoodieIndexFactory {
         return new HoodieSimpleIndex(config, getKeyGeneratorForSimpleIndex(config));
       case GLOBAL_SIMPLE:
         return new HoodieGlobalSimpleIndex(config, getKeyGeneratorForSimpleIndex(config));
+      case BUCKET:
+        switch (config.getBucketIndexEngineType()) {
+          case SIMPLE:
+            return new HoodieSimpleBucketIndex(config);
+          case CONSISTENT_HASHING:
+            return new HoodieSparkConsistentBucketIndex(config);
+          default:
+            throw new HoodieIndexException("Unknown bucket index engine type: " + config.getBucketIndexEngineType());
+        }
       default:
         throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());
     }
@@ -90,6 +98,8 @@ public final class SparkHoodieIndexFactory {
         return false;
       case GLOBAL_SIMPLE:
         return true;
+      case BUCKET:
+        return false;
       default:
         return createIndex(config).isGlobal();
     }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
new file mode 100644
index 0000000000..ca6bf0fc7d
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses,
+                                                HoodieEngineContext context,
+                                                HoodieTable hoodieTable)
+      throws HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a writer must have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed write.
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  @Override
+  protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath) {
+    return new ConsistentBucketIndexLocationMapper(table, partitionPath);
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage)
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata
+   */
+  public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) {
+    HoodieConsistentHashingMetadata metadata = loadMetadata(table, partition);
+    if (metadata != null) {
+      return metadata;
+    }
+
+    // There is no metadata, so try to create a new one and save it.
+    metadata = new HoodieConsistentHashingMetadata(partition, numBuckets);
+    if (saveMetadata(table, metadata, false)) {
+      return metadata;
+    }
+
+    // The creation failed, so try load metadata again. Concurrent creation of metadata should have succeeded.
+    // Note: the consistent problem of cloud storage is handled internal in the HoodieWrapperFileSystem, i.e., ConsistentGuard
+    metadata = loadMetadata(table, partition);
+    ValidationUtils.checkState(metadata != null, "Failed to load or create metadata, partition: " + partition);
+    return metadata;
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, return null
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata or null if it does not exist
+   */
+  public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) {
+    Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition);
+
+    try {
+      if (!table.getMetaClient().getFs().exists(metadataPath)) {
+        return null;
+      }
+      FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath);
+      final HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      Predicate<FileStatus> metaFilePredicate = fileStatus -> {
+        String filename = fileStatus.getPath().getName();
+        if (!filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)) {
+          return false;
+        }
+        String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(filename);
+        return completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS);
+      };
+
+      // Get a valid hashing metadata with the largest (latest) timestamp
+      FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate)
+          .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null);
+
+      if (metaFile == null) {
+        return null;
+      }
+
+      byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
+      return HoodieConsistentHashingMetadata.fromBytes(content);
+    } catch (IOException e) {
+      LOG.error("Error when loading hashing metadata, partition: " + partition, e);
+      throw new HoodieIndexException("Error while loading hashing metadata", e);
+    }
+  }
+
+  /**
+   * Save metadata into storage
+   *
+   * @param table hoodie table
+   * @param metadata hashing metadata to be saved
+   * @param overwrite whether to overwrite existing metadata
+   * @return true if the metadata is saved successfully
+   */
+  private static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, boolean overwrite) {
+    HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+    Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath());
+    Path fullPath = new Path(dir, metadata.getFilename());
+    try (FSDataOutputStream fsOut = fs.create(fullPath, overwrite)) {
+      byte[] bytes = metadata.toBytes();
+      fsOut.write(bytes);
+      fsOut.close();
+      return true;
+    } catch (IOException e) {
+      LOG.warn("Failed to update bucket metadata: " + metadata, e);
+    }
+    return false;
+  }
+
+  public class ConsistentBucketIndexLocationMapper implements BucketIndexLocationMapper {
+
+    /**
+     * Mapping from partitionPath -> bucket identifier
+     */
+    private final Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+    public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String> partitions) {
+      // TODO maybe parallel
+      partitionToIdentifier = partitions.stream().collect(Collectors.toMap(p -> p, p -> {
+        HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p);
+        return new ConsistentBucketIdentifier(metadata);
+      }));
+    }
+
+    @Override
+    public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key, String partitionPath) {
+      ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+      if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) {
+        /**
+         * Dynamic Bucket Index doesn't need the instant time of the latest file group.
+         * We add suffix 0 here to the file uuid, following the naming convention, i.e., fileId = [uuid]_[numWrites]
+         */
+        return Option.of(new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPrefix(), 0)));
+      }
+
+      LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: "
+          + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString());
+      throw new HoodieIndexException("Failed to getBucket as hashing node has no file group");
+    }
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
new file mode 100644
index 0000000000..e0bc22f70d
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.RealtimeFileStatus;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Test consistent hashing index
+ */
+@Tag("functional")
+public class TestConsistentBucketIndex extends HoodieClientTestHarness {
+
+  private final Random random = new Random(1);
+  private HoodieIndex index;
+  private HoodieWriteConfig config;
+
+  private static Stream<Arguments> configParams() {
+    // preserveMetaField, partitioned
+    Object[][] data = new Object[][] {
+        {true, false},
+        {false, false},
+        {true, true},
+        {false, true},
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private void setUp(boolean populateMetaFields, boolean partitioned) throws Exception {
+    initPath();
+    initSparkContexts();
+    if (partitioned) {
+      initTestDataGenerator();
+    } else {
+      initTestDataGenerator(new String[] {""});
+    }
+    initFileSystem();
+    Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
+    config = getConfigBuilder()
+        .withProperties(props)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withIndexKeyField("_row_key")
+            .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
+            .build())
+        .withAutoCommit(false)
+        .build();
+    writeClient = getHoodieWriteClient(config);
+    index = writeClient.getIndex();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupResources();
+  }
+
+  /**
+   * Test bucket index tagging (always tag regardless of the write status)
+   * Test bucket index tagging consistency, two tagging result should be same
+   *
+   * @param populateMetaFields
+   * @param partitioned
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testTagLocation(boolean populateMetaFields, boolean partitioned) throws Exception {
+    setUp(populateMetaFields, partitioned);
+    String newCommitTime = "001";
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+
+    // The records should be tagged anyway, even though it is the first time doing tagging
+    List<HoodieRecord> taggedRecord = tagLocation(index, writeRecords, hoodieTable).collect();
+    Assertions.assertTrue(taggedRecord.stream().allMatch(r -> r.isCurrentLocationKnown()));
+
+    // Tag again, the records should get the same location (hashing metadata has been persisted after the first tagging)
+    List<HoodieRecord> taggedRecord2 = tagLocation(index, writeRecords, hoodieTable).collect();
+    for (HoodieRecord ref : taggedRecord) {
+      for (HoodieRecord record : taggedRecord2) {
+        if (ref.getRecordKey().equals(record.getRecordKey())) {
+          Assertions.assertEquals(ref.getCurrentLocation(), record.getCurrentLocation());
+          break;
+        }
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testWriteData(boolean populateMetaFields, boolean partitioned) throws Exception {
+    setUp(populateMetaFields, partitioned);
+    String newCommitTime = "001";
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Insert totalRecords records
+    writeClient.startCommitWithTime(newCommitTime);
+    List<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect();
+    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
+    boolean success = writeClient.commitStats(newCommitTime, writeStatues.stream()
+        .map(WriteStatus::getStat)
+        .collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType());
+    Assertions.assertTrue(success);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    // The number of distinct fileId should be the same as total log file numbers
+    Assertions.assertEquals(writeStatues.stream().map(WriteStatus::getFileId).distinct().count(),
+        Arrays.stream(dataGen.getPartitionPaths()).mapToInt(p -> Objects.requireNonNull(listStatus(p, true)).length).sum());
+    Assertions.assertEquals(totalRecords, readRecords(dataGen.getPartitionPaths(), populateMetaFields).size());
+
+    // Upsert the same set of records, the number of records should be same
+    newCommitTime = "002";
+    writeClient.startCommitWithTime(newCommitTime);
+    writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect();
+    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
+    success = writeClient.commitStats(newCommitTime, writeStatues.stream()
+        .map(WriteStatus::getStat)
+        .collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType());
+    Assertions.assertTrue(success);
+    // The number of log file should double after this insertion
+    long numberOfLogFiles = Arrays.stream(dataGen.getPartitionPaths())
+        .mapToInt(p -> {
+          return Arrays.stream(listStatus(p, true)).mapToInt(fs ->
+              fs instanceof RealtimeFileStatus ? ((RealtimeFileStatus) fs).getDeltaLogFiles().size() : 1).sum();
+        }).sum();
+    Assertions.assertEquals(writeStatues.stream().map(WriteStatus::getFileId).distinct().count() * 2, numberOfLogFiles);
+    // The record number should remain same because of deduplication
+    Assertions.assertEquals(totalRecords, readRecords(dataGen.getPartitionPaths(), populateMetaFields).size());
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Upsert new set of records, and validate the total number of records
+    newCommitTime = "003";
+    records = dataGen.generateInserts(newCommitTime, totalRecords);
+    writeRecords = jsc.parallelize(records, 2);
+    writeClient.startCommitWithTime(newCommitTime);
+    writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect();
+    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
+    success = writeClient.commitStats(newCommitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+    Assertions.assertTrue(success);
+    Assertions.assertEquals(totalRecords * 2, readRecords(dataGen.getPartitionPaths(), populateMetaFields).size());
+  }
+
+  private List<GenericRecord> readRecords(String[] partitions, boolean populateMetaFields) {
+    return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf,
+        Arrays.stream(partitions).map(p -> Paths.get(basePath, p).toString()).collect(Collectors.toList()),
+        basePath, new JobConf(hadoopConf), true, populateMetaFields);
+  }
+
+  private FileStatus[] listStatus(String p, boolean realtime) {
+    JobConf jobConf = new JobConf(hadoopConf);
+    FileInputFormat.setInputPaths(jobConf, Paths.get(basePath, p).toString());
+    FileInputFormat format = HoodieInputFormatUtils.getInputFormat(HoodieFileFormat.PARQUET, realtime, jobConf);
+    try {
+      if (realtime) {
+        return ((HoodieParquetRealtimeInputFormat) format).listStatus(jobConf);
+      } else {
+        return ((HoodieParquetInputFormat) format).listStatus(jobConf);
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+  private HoodieWriteConfig.Builder getConfigBuilder() {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
+        .forTable("test-trip-table")
+        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index 024cf1ff50..8cbb74e6f5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -131,6 +131,9 @@ public class TestHoodieIndex extends TestHoodieMetadataBase {
     HoodieIndexConfig.Builder indexBuilder = HoodieIndexConfig.newBuilder().withIndexType(indexType)
         .fromProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen())
         .withIndexType(indexType);
+    if (indexType == IndexType.BUCKET) {
+      indexBuilder.withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
+    }
     config = getConfigBuilder()
         .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen())
         .withRollbackUsingMarkers(rollbackUsingMarkers)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java
index 171403eb03..b843546799 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndexConfigs.java
@@ -26,7 +26,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.index.bloom.HoodieBloomIndex;
 import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
-import org.apache.hudi.index.bucket.HoodieBucketIndex;
+import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
+import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
 import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
 import org.apache.hudi.index.simple.HoodieSimpleIndex;
@@ -88,8 +89,15 @@ public class TestHoodieIndexConfigs {
         break;
       case BUCKET:
         config = clientConfigBuilder.withPath(basePath)
-            .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET).build()).build();
-        assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieBucketIndex);
+            .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET)
+                .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build()).build();
+        assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSimpleBucketIndex);
+
+        config = clientConfigBuilder.withPath(basePath)
+            .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.BUCKET)
+              .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING).build())
+            .build();
+        assertTrue(SparkHoodieIndexFactory.createIndex(config) instanceof HoodieSparkConsistentBucketIndex);
         break;
       default:
         // no -op. just for checkstyle errors
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
similarity index 91%
rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java
rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
index 2b3765948b..c8b877ceca 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieBucketIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bucket/TestHoodieSimpleBucketIndex.java
@@ -52,10 +52,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestHoodieBucketIndex extends HoodieClientTestHarness {
+public class TestHoodieSimpleBucketIndex extends HoodieClientTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(TestHoodieBucketIndex.class);
-  private static final Schema SCHEMA = getSchemaFromResource(TestHoodieBucketIndex.class, "/exampleSchema.avsc", true);
+  private static final Logger LOG = LogManager.getLogger(TestHoodieSimpleBucketIndex.class);
+  private static final Schema SCHEMA = getSchemaFromResource(TestHoodieSimpleBucketIndex.class, "/exampleSchema.avsc", true);
   private static final int NUM_BUCKET = 8;
 
   @BeforeEach
@@ -78,11 +78,15 @@ public class TestHoodieBucketIndex extends HoodieClientTestHarness {
     props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "_row_key");
     assertThrows(HoodieIndexException.class, () -> {
       HoodieIndexConfig.newBuilder().fromProperties(props)
-          .withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("8").build();
+          .withIndexType(HoodieIndex.IndexType.BUCKET)
+          .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
+          .withBucketNum("8").build();
     });
     props.setProperty(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key(), "uuid");
     HoodieIndexConfig.newBuilder().fromProperties(props)
-        .withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("8").build();
+        .withIndexType(HoodieIndex.IndexType.BUCKET)
+        .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
+        .withBucketNum("8").build();
   }
 
   @Test
@@ -110,7 +114,7 @@ public class TestHoodieBucketIndex extends HoodieClientTestHarness {
 
     HoodieWriteConfig config = makeConfig();
     HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
-    HoodieBucketIndex bucketIndex = new HoodieBucketIndex(config);
+    HoodieSimpleBucketIndex bucketIndex = new HoodieSimpleBucketIndex(config);
     HoodieData<HoodieRecord<HoodieAvroRecord>> taggedRecordRDD = bucketIndex.tagLocation(HoodieJavaRDD.of(recordRDD), context, table);
     assertFalse(taggedRecordRDD.collectAsList().stream().anyMatch(r -> r.isCurrentLocationKnown()));
 
@@ -133,6 +137,7 @@ public class TestHoodieBucketIndex extends HoodieClientTestHarness {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(SCHEMA.toString())
         .withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props)
             .withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
             .withIndexKeyField("_row_key")
             .withBucketNum(String.valueOf(NUM_BUCKET)).build()).build();
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 9574d35a65..30f7ad6654 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -148,7 +148,10 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
     props.putAll(indexConfig.build().getProps());
     if (indexType.equals(HoodieIndex.IndexType.BUCKET)) {
       props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
-      indexConfig.fromProperties(props).withIndexKeyField("_row_key").withBucketNum("1");
+      indexConfig.fromProperties(props)
+          .withIndexKeyField("_row_key")
+          .withBucketNum("1")
+          .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE);
       props.putAll(indexConfig.build().getProps());
       props.putAll(HoodieLayoutConfig.newBuilder().fromProperties(props)
           .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 4e8d2b7ece..4b391ecbab 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -77,6 +77,15 @@ public abstract class HoodieData<T> implements Serializable {
   public abstract <O> HoodieData<O> mapPartitions(
       SerializableFunction<Iterator<T>, Iterator<O>> func, boolean preservesPartitioning);
 
+  /**
+   * @param func                  serializable map function by taking a partition of objects
+   *                              and generating an iterator.
+   * @param <O>                   output object type.
+   * @return {@link HoodieData<O>} containing the result. Actual execution may be deferred.
+   */
+  public abstract <O> HoodieData<O> mapPartitions(
+      SerializableFunction<Iterator<T>, Iterator<O>> func);
+
   /**
    * @param func serializable flatmap function.
    * @param <O>  output object type.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
index c23e712cf4..28ed2e282d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
@@ -99,6 +99,11 @@ public class HoodieList<T> extends HoodieData<T> {
 
   @Override
   public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, Iterator<O>> func, boolean preservesPartitioning) {
+    return mapPartitions(func);
+  }
+
+  @Override
+  public <O> HoodieData<O> mapPartitions(SerializableFunction<Iterator<T>, Iterator<O>> func) {
     List<O> result = new ArrayList<>();
     throwingMapWrapper(func).apply(listData.iterator()).forEachRemaining(result::add);
     return HoodieList.of(result);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 79badb48a5..aa0cadf5b9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -348,6 +348,10 @@ public class FSUtils {
     return UUID.randomUUID().toString();
   }
 
+  public static String createNewFileId(String idPfx, int id) {
+    return String.format("%s-%d", idPfx, id);
+  }
+
   /**
    * Get the file extension from the log file.
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java b/hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java
new file mode 100644
index 0000000000..262bb96322
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.JsonUtils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Used in consistent hashing index, representing nodes in the consistent hash ring.
+ * Record the end hash range value and its corresponding file group id.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ConsistentHashingNode implements Serializable {
+
+  private final int value;
+  private final String fileIdPrefix;
+
+  @JsonCreator
+  public ConsistentHashingNode(@JsonProperty("value") int value, @JsonProperty("fileIdPrefix") String fileIdPrefix) {
+    this.value = value;
+    this.fileIdPrefix = fileIdPrefix;
+  }
+
+  public static String toJsonString(List<ConsistentHashingNode> nodes) throws IOException {
+    return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(nodes);
+  }
+
+  public static List<ConsistentHashingNode> fromJsonString(String json) throws Exception {
+    if (json == null || json.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    ConsistentHashingNode[] nodes = JsonUtils.getObjectMapper().readValue(json, ConsistentHashingNode[].class);
+    return Arrays.asList(nodes);
+  }
+
+  public int getValue() {
+    return value;
+  }
+
+  public String getFileIdPrefix() {
+    return fileIdPrefix;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("ConsistentHashingNode{");
+    sb.append("value=").append(value);
+    sb.append(", fileIdPfx='").append(fileIdPrefix).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 53ceb00409..f5077dea85 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -18,17 +18,15 @@
 
 package org.apache.hudi.common.model;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -227,7 +225,7 @@ public class HoodieCommitMetadata implements Serializable {
       LOG.info("partition path is null for " + partitionToWriteStats.get(null));
       partitionToWriteStats.remove(null);
     }
-    return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
+    return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
   }
 
   public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception {
@@ -235,7 +233,7 @@ public class HoodieCommitMetadata implements Serializable {
       // For empty commit file (no data or somethings bad happen).
       return clazz.newInstance();
     }
-    return getObjectMapper().readValue(jsonStr, clazz);
+    return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
   // Here the functions are named "fetch" instead of "get", to get avoid of the json conversion.
@@ -457,13 +455,6 @@ public class HoodieCommitMetadata implements Serializable {
     }
   }
 
-  protected static ObjectMapper getObjectMapper() {
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
-    return mapper;
-  }
-
   @Override
   public String toString() {
     return "HoodieCommitMetadata{" + "partitionToWriteStats=" + partitionToWriteStats
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
new file mode 100644
index 0000000000..46f1152627
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.JsonUtils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * All the metadata that is used for consistent hashing bucket index
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieConsistentHashingMetadata implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieConsistentHashingMetadata.class);
+  /**
+   * Upper-bound of the hash value
+   */
+  public static final int HASH_VALUE_MASK = Integer.MAX_VALUE;
+  public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta";
+
+  private final short version;
+  private final String partitionPath;
+  private final String instant;
+  private final int numBuckets;
+  private final int seqNo;
+  private final List<ConsistentHashingNode> nodes;
+
+  @JsonCreator
+  public HoodieConsistentHashingMetadata(@JsonProperty("version") short version, @JsonProperty("partitionPath") String partitionPath,
+                                         @JsonProperty("instant") String instant, @JsonProperty("numBuckets") int numBuckets,
+                                         @JsonProperty("seqNo") int seqNo, @JsonProperty("nodes") List<ConsistentHashingNode> nodes) {
+    this.version = version;
+    this.partitionPath = partitionPath;
+    this.instant = instant;
+    this.numBuckets = numBuckets;
+    this.seqNo = seqNo;
+    this.nodes = nodes;
+  }
+
+  /**
+   * Construct default metadata with all bucket's file group uuid initialized
+   */
+  public HoodieConsistentHashingMetadata(String partitionPath, int numBuckets) {
+    this((short) 0, partitionPath, HoodieTimeline.INIT_INSTANT_TS, numBuckets, 0, constructDefaultHashingNodes(numBuckets));
+  }
+
+  private static List<ConsistentHashingNode> constructDefaultHashingNodes(int numBuckets) {
+    long step = ((long) HASH_VALUE_MASK + numBuckets - 1) / numBuckets;
+    return IntStream.range(1, numBuckets + 1)
+        .mapToObj(i -> new ConsistentHashingNode((int) Math.min(step * i, HASH_VALUE_MASK), FSUtils.createNewFileIdPfx())).collect(Collectors.toList());
+  }
+
+  public short getVersion() {
+    return version;
+  }
+
+  public String getPartitionPath() {
+    return partitionPath;
+  }
+
+  public String getInstant() {
+    return instant;
+  }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+
+  public int getSeqNo() {
+    return seqNo;
+  }
+
+  public List<ConsistentHashingNode> getNodes() {
+    return nodes;
+  }
+
+  public String getFilename() {
+    return instant + HASHING_METADATA_FILE_SUFFIX;
+  }
+
+  public byte[] toBytes() throws IOException {
+    return toJsonString().getBytes(StandardCharsets.UTF_8);
+  }
+
+  public static HoodieConsistentHashingMetadata fromBytes(byte[] bytes) throws IOException {
+    try {
+      return fromJsonString(new String(bytes, StandardCharsets.UTF_8), HoodieConsistentHashingMetadata.class);
+    } catch (Exception e) {
+      throw new IOException("unable to read hashing metadata", e);
+    }
+  }
+
+  private String toJsonString() throws IOException {
+    return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
+  }
+
+  protected static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception {
+    if (jsonStr == null || jsonStr.isEmpty()) {
+      // For empty commit file (no data or something bad happen).
+      return clazz.newInstance();
+    }
+    return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
+  }
+
+  /**
+   * Get instant time from the hashing metadata filename
+   * Pattern of the filename: <instant>.HASHING_METADATA_FILE_SUFFIX
+   */
+  public static String getTimestampFromFile(String filename) {
+    return filename.split("\\.")[0];
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
index 7cc9ee3a0c..2dd6cda47d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
@@ -18,11 +18,9 @@
 
 package org.apache.hudi.common.model;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import org.apache.hudi.common.util.JsonUtils;
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -80,7 +78,7 @@ public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
       LOG.info("partition path is null for " + partitionToReplaceFileIds.get(null));
       partitionToReplaceFileIds.remove(null);
     }
-    return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
+    return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
   }
 
   public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception {
@@ -88,7 +86,7 @@ public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
       // For empty commit file (no data or somethings bad happen).
       return clazz.newInstance();
     }
-    return getObjectMapper().readValue(jsonStr, clazz);
+    return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
   @Override
@@ -124,13 +122,6 @@ public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
     }
   }
 
-  protected static ObjectMapper getObjectMapper() {
-    ObjectMapper mapper = new ObjectMapper();
-    mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
-    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
-    return mapper;
-  }
-
   @Override
   public String toString() {
     return "HoodieReplaceMetadata{" + "partitionToWriteStats=" + partitionToWriteStats
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
index a354092675..0a5240ed55 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.util.JsonUtils;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -81,7 +83,7 @@ public class HoodieRollingStatMetadata implements Serializable {
       LOG.info("partition path is null for " + partitionToRollingStats.get(null));
       partitionToRollingStats.remove(null);
     }
-    return HoodieCommitMetadata.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
+    return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
   }
 
   public HoodieRollingStatMetadata merge(HoodieRollingStatMetadata rollingStatMetadata) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 6b10a62820..546ddf7a30 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -85,6 +85,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
   public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat";
   public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata";
+  public static final String HASHING_METADATA_FOLDER_NAME = ".bucket_index" + Path.SEPARATOR + "consistent_hashing_metadata";
   public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH
       + Path.SEPARATOR + ".partitions";
   public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR
@@ -211,6 +212,13 @@ public class HoodieTableMetaClient implements Serializable {
     return new Path(metaPath.get(), SCHEMA_FOLDER_NAME).toString();
   }
 
+  /**
+   * @return Hashing metadata base path
+   */
+  public String getHashingMetadataPath() {
+    return new Path(metaPath.get(), HASHING_METADATA_FOLDER_NAME).toString();
+  }
+
   /**
    * @return Temp Folder path
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java
new file mode 100644
index 0000000000..d820bde178
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/JsonUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class JsonUtils {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  static {
+    MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+    MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+  }
+
+  public static ObjectMapper getObjectMapper() {
+    return MAPPER;
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
index c56d760978..ccb29dfbb5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/hash/HashID.java
@@ -106,6 +106,15 @@ public class HashID implements Serializable {
     }
   }
 
+  public static int getXXHash32(final String message, int hashSeed) {
+    return getXXHash32(message.getBytes(StandardCharsets.UTF_8), hashSeed);
+  }
+
+  public static int getXXHash32(final byte[] message, int hashSeed) {
+    XXHashFactory factory = XXHashFactory.fastestInstance();
+    return factory.hash32().hash(message, 0, message.length, hashSeed);
+  }
+
   private static byte[] getXXHash(final byte[] message, final Size bits) {
     XXHashFactory factory = XXHashFactory.fastestInstance();
     switch (bits) {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieConsistentHashingMetadata.java
similarity index 57%
copy from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java
copy to hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieConsistentHashingMetadata.java
index e86d253df4..8aa2e65561 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/storage/HoodieLayoutFactory.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieConsistentHashingMetadata.java
@@ -16,23 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.storage;
+package org.apache.hudi.common.model;
 
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-/**
- * A factory to generate layout.
- */
-public final class HoodieLayoutFactory {
-  public static HoodieStorageLayout createLayout(HoodieWriteConfig config) {
-    switch (config.getLayoutType()) {
-      case DEFAULT:
-        return new HoodieDefaultLayout(config);
-      case BUCKET:
-        return new HoodieBucketLayout(config);
-      default:
-        throw new HoodieNotSupportedException("Unknown layout type, set " + config.getLayoutType());
-    }
+public class TestHoodieConsistentHashingMetadata {
+
+  @Test
+  public void testGetTimestamp() {
+    Assertions.assertTrue(HoodieConsistentHashingMetadata.getTimestampFromFile("0000.hashing_metadata").equals("0000"));
+    Assertions.assertTrue(HoodieConsistentHashingMetadata.getTimestampFromFile("1234.hashing_metadata").equals("1234"));
   }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 311c131d43..dc64856d3c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -66,6 +66,10 @@ public class HoodieCommonTestHarness {
     dataGen = new HoodieTestDataGenerator();
   }
 
+  protected void initTestDataGenerator(String[] partitionPaths) {
+    dataGen = new HoodieTestDataGenerator(partitionPaths);
+  }
+
   /**
    * Cleanups test data generator.
    *
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
deleted file mode 100644
index 4491a74fa6..0000000000
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/index/bucket/TestBucketIdentifier.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.index.bucket;
-
-import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.keygen.KeyGenUtils;
-import org.apache.hudi.testutils.KeyGeneratorTestUtilities;
-
-import org.apache.avro.generic.GenericRecord;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class TestBucketIdentifier {
-
-  @Test
-  public void testBucketFileId() {
-    for (int i = 0; i < 1000; i++) {
-      String bucketId = BucketIdentifier.bucketIdStr(i);
-      String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketId);
-      assert BucketIdentifier.bucketIdFromFileId(fileId) == i;
-    }
-  }
-
-  @Test
-  public void testBucketIdWithSimpleRecordKey() {
-    String recordKeyField = "_row_key";
-    String indexKeyField = "_row_key";
-    GenericRecord record = KeyGeneratorTestUtilities.getRecord();
-    HoodieRecord hoodieRecord = new HoodieAvroRecord(
-        new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
-    int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
-    assert bucketId == BucketIdentifier.getBucketId(
-        Arrays.asList(record.get(indexKeyField).toString()), 8);
-  }
-
-  @Test
-  public void testBucketIdWithComplexRecordKey() {
-    List<String> recordKeyField = Arrays.asList("_row_key","ts_ms");
-    String indexKeyField = "_row_key";
-    GenericRecord record = KeyGeneratorTestUtilities.getRecord();
-    HoodieRecord hoodieRecord = new HoodieAvroRecord(
-        new HoodieKey(KeyGenUtils.getRecordKey(record, recordKeyField, false), ""), null);
-    int bucketId = BucketIdentifier.getBucketId(hoodieRecord, indexKeyField, 8);
-    assert bucketId == BucketIdentifier.getBucketId(
-        Arrays.asList(record.get(indexKeyField).toString()), 8);
-  }
-}