You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/05 23:59:25 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #4480: [HUDI-3123] consistent hashing index: basic write path (upsert/insert)

alexeykudinkin commented on code in PR #4480:
URL: https://github.com/apache/hudi/pull/4480#discussion_r866314882


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java:
##########
@@ -22,41 +22,50 @@
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-public class BucketIdentifier {
-  // compatible with the spark bucket name
+public class BucketIdentifier implements Serializable {
+  // Compatible with the spark bucket name
   private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$");
 
   public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets) {
     return getBucketId(record.getKey(), indexKeyFields, numBuckets);
   }
 
   public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
-    return getBucketId(hoodieKey.getRecordKey(), indexKeyFields, numBuckets);
+    return (getHashKeys(hoodieKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets;
   }
 
   public static int getBucketId(String recordKey, String indexKeyFields, int numBuckets) {
-    List<String> hashKeyFields;
+    return (getHashKeys(recordKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets;
+  }
+
+  public static List<String> getHashKeys(HoodieKey hoodieKey, String indexKeyFields) {
+    return getHashKeys(hoodieKey.getRecordKey(), indexKeyFields);
+  }
+
+  protected static List<String> getHashKeys(String recordKey, String indexKeyFields) {
+    List<String> hashKeys;
     if (!recordKey.contains(":")) {
-      hashKeyFields = Collections.singletonList(recordKey);
+      hashKeys = Collections.singletonList(recordKey);
     } else {
       Map<String, String> recordKeyPairs = Arrays.stream(recordKey.split(","))
           .map(p -> p.split(":"))
           .collect(Collectors.toMap(p -> p[0], p -> p[1]));
-      hashKeyFields = Arrays.stream(indexKeyFields.split(","))
+      hashKeys = Arrays.stream(indexKeyFields.split(","))
           .map(f -> recordKeyPairs.get(f))
           .collect(Collectors.toList());
     }
-    return (hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets;
+    return hashKeys;
   }
 
-  // only for test
+  // Only for test

Review Comment:
   Why do we need method used only for tests?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * In-memory structure to speed up ring mapping (hashing value -> hashing node)
+   */
+  private final TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * Mapping from fileId -> hashing node
+   */
+  private final Map<String, ConsistentHashingNode> fileIdToBucket;
+
+  public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) {
+    this.metadata = metadata;
+    this.fileIdToBucket = new HashMap<>();
+    this.ring = new TreeMap<>();
+    initialize();
+  }
+
+  public Collection<ConsistentHashingNode> getNodes() {
+    return ring.values();
+  }
+
+  public HoodieConsistentHashingMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getNumBuckets() {
+    return getNodes().size();

Review Comment:
   Can do `ring.size()` directly to avoid additional allocations



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a writer must have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed write.
+   *
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   *
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {

Review Comment:
   Please check my comment above regarding making Index impl stateless



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a writer must have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed write.
+   *
+   * @param instantTime

Review Comment:
   Let's omit things not filled in the java-doc



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LazyIterableIterator.java:
##########
@@ -45,7 +45,9 @@ public LazyIterableIterator(Iterator<I> in) {
   /**
    * Called once, before any elements are processed.
    */
-  protected abstract void start();
+  protected void start() {

Review Comment:
   nit: Let's keep empty impl to just 1 line 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * In-memory structure to speed up ring mapping (hashing value -> hashing node)
+   */
+  private final TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * Mapping from fileId -> hashing node
+   */
+  private final Map<String, ConsistentHashingNode> fileIdToBucket;
+
+  public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) {
+    this.metadata = metadata;
+    this.fileIdToBucket = new HashMap<>();
+    this.ring = new TreeMap<>();
+    initialize();
+  }
+
+  public Collection<ConsistentHashingNode> getNodes() {
+    return ring.values();
+  }
+
+  public HoodieConsistentHashingMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getNumBuckets() {
+    return getNodes().size();
+  }
+
+  /**
+   * Get bucket of the given file group
+   *
+   * @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
+   * @return
+   */
+  public ConsistentHashingNode getBucketByFileId(String fileId) {
+    return fileIdToBucket.get(fileId);
+  }
+
+  public ConsistentHashingNode getBucket(HoodieKey hoodieKey, String indexKeyFields) {
+    return getBucket(getHashKeys(hoodieKey, indexKeyFields));
+  }
+
+  protected ConsistentHashingNode getBucket(List<String> hashKeys) {
+    int hashValue = 0;
+    for (int i = 0; i < hashKeys.size(); ++i) {
+      hashValue = HashID.getXXHash32(hashKeys.get(i), hashValue);

Review Comment:
   Any reason to make it sequential instead of concatenating the keys (with delimiter) and then invoking hash only once?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java:
##########
@@ -37,28 +36,30 @@
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
 
 /**
  * Hash indexing mechanism.
  */
-public class HoodieBucketIndex extends HoodieIndex<Object, Object> {
+public abstract class HoodieBucketIndex extends HoodieIndex<Object, Object> {
 
-  private static final Logger LOG =  LogManager.getLogger(HoodieBucketIndex.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieBucketIndex.class);
 
-  private final int numBuckets;
+  protected final int numBuckets;
+  protected final String indexKeyFields;

Review Comment:
   Let's split the hash-keys here to avoid doing that in the hot-path



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * All the metadata that is used for consistent hashing bucket index
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieConsistentHashingMetadata implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieConsistentHashingMetadata.class);
+  /**
+   * Upper-bound of the hash value
+   */
+  public static final int HASH_VALUE_MASK = Integer.MAX_VALUE;
+  public static final String HASHING_METADATA_FILE_SUFFIX = ".hashing_meta";
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  static {
+    MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+    MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+  }
+
+  private final short version;
+  private final String partitionPath;
+  private final String instant;
+  private final int numBuckets;
+  private final int seqNo;
+  private final List<ConsistentHashingNode> nodes;
+
+  @JsonCreator
+  public HoodieConsistentHashingMetadata(@JsonProperty("version") short version, @JsonProperty("partitionPath") String partitionPath,
+                                         @JsonProperty("instant") String instant, @JsonProperty("numBuckets") int numBuckets,
+                                         @JsonProperty("seqNo") int seqNo, @JsonProperty("nodes") List<ConsistentHashingNode> nodes) {
+    this.version = version;
+    this.partitionPath = partitionPath;
+    this.instant = instant;
+    this.numBuckets = numBuckets;
+    this.seqNo = seqNo;
+    this.nodes = nodes;
+  }
+
+  public HoodieConsistentHashingMetadata(String partitionPath, int numBuckets) {
+    this((short) 0, partitionPath, HoodieTimeline.INIT_INSTANT_TS, numBuckets, 0);
+  }
+
+  /**
+   * Construct default metadata with all bucket's file group uuid initialized
+   *
+   * @param partitionPath
+   * @param numBuckets
+   */
+  private HoodieConsistentHashingMetadata(short version, String partitionPath, String instant, int numBuckets, int seqNo) {
+    this.version = version;
+    this.partitionPath = partitionPath;
+    this.instant = instant;
+    this.numBuckets = numBuckets;
+    this.seqNo = seqNo;
+
+    nodes = new ArrayList<>();
+    long step = ((long) HASH_VALUE_MASK + numBuckets - 1) / numBuckets;
+    for (int i = 1; i <= numBuckets; ++i) {

Review Comment:
   Let's extract this to a static method, and instead of having 2 ctors essentially duplicating each other (#1 and #3) and instead invoke former one here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple bucket index implementation, with fixed bucket number.
+ */
+public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class);
+
+  /**
+   * Mapping from partitionPath -> bucketId -> fileInfo
+   */
+  private Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList;

Review Comment:
   Why do we need to maintain this mutable field, when it's only being used in `tagLocation`? 
   Let's avoid making these components stateful, and instead create a smaller object which different index impls can override and then inside `tagLocation` we can use it like following:
   
   ```
   abstract LocationMapper getLocationMapper()
   
   HoodieData tagLocation() {
     mapper = getLocationMapper()
     loc = mapper.getRecordLocation(key, partitionPath);
     // ...
   }
   ```



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.RealtimeFileStatus;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Test consistent hashing index
+ */
+@Tag("functional")
+public class TestConsistentBucketIndex extends HoodieClientTestHarness {
+
+  private final Random random = new Random();

Review Comment:
   Please fix the seed. Tests should be reproducible, otherwise they become close to impossible to troubleshoot



##########
hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Used in consistent hashing index, representing nodes in the consistent hash ring.
+ * Record the end hash range value and its corresponding file group id.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ConsistentHashingNode implements Serializable {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  static {
+    MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);

Review Comment:
   nit: I see this being init'ed twice just in this PR. Shall we extract this to a common JSON ser/de utility?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * In-memory structure to speed up ring mapping (hashing value -> hashing node)
+   */
+  private final TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * Mapping from fileId -> hashing node
+   */
+  private final Map<String, ConsistentHashingNode> fileIdToBucket;
+
+  public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) {
+    this.metadata = metadata;
+    this.fileIdToBucket = new HashMap<>();
+    this.ring = new TreeMap<>();
+    initialize();
+  }
+
+  public Collection<ConsistentHashingNode> getNodes() {
+    return ring.values();
+  }
+
+  public HoodieConsistentHashingMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getNumBuckets() {
+    return getNodes().size();
+  }
+
+  /**
+   * Get bucket of the given file group
+   *
+   * @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
+   * @return
+   */
+  public ConsistentHashingNode getBucketByFileId(String fileId) {
+    return fileIdToBucket.get(fileId);
+  }
+
+  public ConsistentHashingNode getBucket(HoodieKey hoodieKey, String indexKeyFields) {
+    return getBucket(getHashKeys(hoodieKey, indexKeyFields));
+  }
+
+  protected ConsistentHashingNode getBucket(List<String> hashKeys) {

Review Comment:
   What's the reason to keep this protected, not private?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/ConsistentHashingNode.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Used in consistent hashing index, representing nodes in the consistent hash ring.
+ * Record the end hash range value and its corresponding file group id.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ConsistentHashingNode implements Serializable {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  static {
+    MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+    MAPPER.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+  }
+
+  private final int value;
+  private final String fileIdPfx;

Review Comment:
   "Pfx" isn't widely recognized acronym (unlike "loc", for ex), i'd suggest to go for "prefix" instead



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIdentifier.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.hash.HashID;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ConsistentBucketIdentifier extends BucketIdentifier {
+
+  /**
+   * Hashing metadata of a partition
+   */
+  private final HoodieConsistentHashingMetadata metadata;
+  /**
+   * In-memory structure to speed up ring mapping (hashing value -> hashing node)
+   */
+  private final TreeMap<Integer, ConsistentHashingNode> ring;
+  /**
+   * Mapping from fileId -> hashing node
+   */
+  private final Map<String, ConsistentHashingNode> fileIdToBucket;
+
+  public ConsistentBucketIdentifier(HoodieConsistentHashingMetadata metadata) {
+    this.metadata = metadata;
+    this.fileIdToBucket = new HashMap<>();
+    this.ring = new TreeMap<>();
+    initialize();
+  }
+
+  public Collection<ConsistentHashingNode> getNodes() {
+    return ring.values();
+  }
+
+  public HoodieConsistentHashingMetadata getMetadata() {
+    return metadata;
+  }
+
+  public int getNumBuckets() {
+    return getNodes().size();
+  }
+
+  /**
+   * Get bucket of the given file group
+   *
+   * @param fileId the file group id. NOTE: not filePfx (i.e., uuid)
+   * @return
+   */
+  public ConsistentHashingNode getBucketByFileId(String fileId) {
+    return fileIdToBucket.get(fileId);
+  }
+
+  public ConsistentHashingNode getBucket(HoodieKey hoodieKey, String indexKeyFields) {
+    return getBucket(getHashKeys(hoodieKey, indexKeyFields));
+  }
+
+  protected ConsistentHashingNode getBucket(List<String> hashKeys) {
+    int hashValue = 0;
+    for (int i = 0; i < hashKeys.size(); ++i) {
+      hashValue = HashID.getXXHash32(hashKeys.get(i), hashValue);
+    }
+    return getBucket(hashValue & HoodieConsistentHashingMetadata.HASH_VALUE_MASK);
+  }
+
+  protected ConsistentHashingNode getBucket(int hashValue) {

Review Comment:
   As well: why not private?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -86,6 +86,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
   public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat";
   public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata";
+  public static final String HASHING_METADATA_FOLER_NAME = ".hashing_metadata";

Review Comment:
   Typo



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple bucket index implementation, with fixed bucket number.
+ */
+public class HoodieSimpleBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSimpleBucketIndex.class);
+
+  /**
+   * Mapping from partitionPath -> bucketId -> fileInfo
+   */
+  private Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList;
+
+  public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  private Map<Integer, HoodieRecordLocation> loadPartitionBucketIdFileIdMapping(
+      HoodieTable hoodieTable,
+      String partition) {
+    // bucketId -> fileIds
+    Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = new HashMap<>();
+    hoodieTable.getMetaClient().reloadActiveTimeline();
+    HoodieIndexUtils
+        .getLatestBaseFilesForPartition(partition, hoodieTable)
+        .forEach(file -> {
+          String fileId = file.getFileId();
+          String commitTime = file.getCommitTime();
+          int bucketId = BucketIdentifier.bucketIdFromFileId(fileId);
+          if (!bucketIdToFileIdMapping.containsKey(bucketId)) {
+            bucketIdToFileIdMapping.put(bucketId, new HoodieRecordLocation(commitTime, fileId));
+          } else {
+            // Check if bucket data is valid
+            throw new HoodieIOException("Find multiple files at partition path="
+                + partition + " belongs to the same bucket id = " + bucketId);
+          }
+        });
+    return bucketIdToFileIdMapping;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionPathFileIDList = new HashMap<>();
+    partitions.forEach(p -> partitionPathFileIDList.put(p, loadPartitionBucketIdFileIdMapping(table, p)));

Review Comment:
   nit: Instead of modifying existing map, you can do `partitions.map(...).collect(Collectors.toMap(...))`



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestConsistentBucketIdIdentifier.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASH_VALUE_MASK;
+
+/**
+ * Unit test of consistent bucket identifier
+ */
+public class TestConsistentBucketIdIdentifier {
+
+  @Test
+  public void testGetBucket() {
+    List<ConsistentHashingNode> nodes = Arrays.asList(
+        new ConsistentHashingNode(100, "0"),
+        new ConsistentHashingNode(0x2fffffff, "1"),
+        new ConsistentHashingNode(0x4fffffff, "2"));
+    HoodieConsistentHashingMetadata meta = new HoodieConsistentHashingMetadata((short) 0, "", "", 3, 0, nodes);
+    ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(meta);
+
+    Assertions.assertEquals(3, identifier.getNumBuckets());
+
+    // Get bucket by hash keys
+    Assertions.assertEquals(nodes.get(2), identifier.getBucket(Arrays.asList("Hudi")));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("bucket_index")));
+    Assertions.assertEquals(nodes.get(1), identifier.getBucket(Arrays.asList("consistent_hashing")));
+    Assertions.assertEquals(nodes.get(2), identifier.getBucket(Arrays.asList("bucket_index", "consistent_hashing")));
+    int[] ref1 = {2, 2, 1, 1, 0, 1, 1, 1, 0, 1};
+    int[] ref2 = {0, 2, 2, 1, 2, 0, 1, 2, 0, 1};
+    for (int i = 0; i < 10; ++i) {

Review Comment:
   What's the point of this for loop test?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;

Review Comment:
   Does not seems like it's been addressed



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.RealtimeFileStatus;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Test consistent hashing index
+ */
+@Tag("functional")
+public class TestConsistentBucketIndex extends HoodieClientTestHarness {
+
+  private final Random random = new Random();
+  private HoodieIndex index;
+  private HoodieWriteConfig config;
+
+  private static Stream<Arguments> configParams() {
+    // preserveMetaField, partitioned
+    Object[][] data = new Object[][] {
+        {true, false},
+        {false, false},
+        {true, true},
+        {false, true},
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private void setUp(boolean populateMetaFields, boolean partitioned) throws Exception {
+    initPath();
+    initSparkContexts();
+    if (partitioned) {
+      initTestDataGenerator();
+    } else {
+      initTestDataGenerator(new String[] {""});
+    }
+    initFileSystem();
+    Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
+    config = getConfigBuilder()
+        .withProperties(props)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withIndexKeyField("_row_key")
+            .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
+            .build())
+        .withAutoCommit(false)
+        .build();
+    writeClient = getHoodieWriteClient(config);
+    index = writeClient.getIndex();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupResources();
+  }
+
+  /**
+   * Test bucket index tagging (always tag regardless of the write status)
+   * Test bucket index tagging consistency, two tagging result should be same
+   *
+   * @param populateMetaFields
+   * @param partitioned
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testTagLocation(boolean populateMetaFields, boolean partitioned) throws Exception {
+    setUp(populateMetaFields, partitioned);
+    String newCommitTime = "001";
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+
+    // The records should be tagged anyway, even though it is the first time doing tagging
+    List<HoodieRecord> taggedRecord = tagLocation(index, writeRecords, hoodieTable).collect();
+    Assertions.assertTrue(taggedRecord.stream().allMatch(r -> r.isCurrentLocationKnown()));
+
+    // Tag again, the records should get the same location (hashing metadata has been persisted after the first tagging)
+    List<HoodieRecord> taggedRecord2 = tagLocation(index, writeRecords, hoodieTable).collect();
+    for (HoodieRecord ref : taggedRecord) {
+      for (HoodieRecord record : taggedRecord2) {
+        if (ref.getRecordKey().equals(record.getRecordKey())) {

Review Comment:
   Can we match the location against a static fixture? 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.RealtimeFileStatus;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Test consistent hashing index
+ */
+@Tag("functional")
+public class TestConsistentBucketIndex extends HoodieClientTestHarness {
+
+  private final Random random = new Random();
+  private HoodieIndex index;
+  private HoodieWriteConfig config;
+
+  private static Stream<Arguments> configParams() {
+    // preserveMetaField, partitioned
+    Object[][] data = new Object[][] {
+        {true, false},
+        {false, false},
+        {true, true},
+        {false, true},
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private void setUp(boolean populateMetaFields, boolean partitioned) throws Exception {
+    initPath();
+    initSparkContexts();
+    if (partitioned) {
+      initTestDataGenerator();
+    } else {
+      initTestDataGenerator(new String[] {""});
+    }
+    initFileSystem();
+    Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
+    config = getConfigBuilder()
+        .withProperties(props)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withIndexKeyField("_row_key")
+            .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
+            .build())
+        .withAutoCommit(false)
+        .build();
+    writeClient = getHoodieWriteClient(config);
+    index = writeClient.getIndex();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupResources();
+  }
+
+  /**
+   * Test bucket index tagging (always tag regardless of the write status)
+   * Test bucket index tagging consistency, two tagging result should be same
+   *
+   * @param populateMetaFields
+   * @param partitioned
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testTagLocation(boolean populateMetaFields, boolean partitioned) throws Exception {
+    setUp(populateMetaFields, partitioned);
+    String newCommitTime = "001";
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+
+    // The records should be tagged anyway, even though it is the first time doing tagging
+    List<HoodieRecord> taggedRecord = tagLocation(index, writeRecords, hoodieTable).collect();
+    Assertions.assertTrue(taggedRecord.stream().allMatch(r -> r.isCurrentLocationKnown()));
+
+    // Tag again, the records should get the same location (hashing metadata has been persisted after the first tagging)
+    List<HoodieRecord> taggedRecord2 = tagLocation(index, writeRecords, hoodieTable).collect();
+    for (HoodieRecord ref : taggedRecord) {
+      for (HoodieRecord record : taggedRecord2) {
+        if (ref.getRecordKey().equals(record.getRecordKey())) {
+          Assertions.assertEquals(ref.getCurrentLocation(), record.getCurrentLocation());
+          break;
+        }
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testWriteData(boolean populateMetaFields, boolean partitioned) throws Exception {
+    setUp(populateMetaFields, partitioned);
+    String newCommitTime = "001";
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Insert totalRecords records
+    writeClient.startCommitWithTime(newCommitTime);
+    List<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect();
+    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
+    Assertions.assertTrue(writeClient.commitStats(newCommitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),

Review Comment:
   Let's extract invocation out of assertion into standalone var, it's very hard to understand the expression right now



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.RealtimeFileStatus;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Test consistent hashing index
+ */
+@Tag("functional")
+public class TestConsistentBucketIndex extends HoodieClientTestHarness {
+
+  private final Random random = new Random();
+  private HoodieIndex index;
+  private HoodieWriteConfig config;
+
+  private static Stream<Arguments> configParams() {
+    // preserveMetaField, partitioned
+    Object[][] data = new Object[][] {
+        {true, false},
+        {false, false},
+        {true, true},
+        {false, true},
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private void setUp(boolean populateMetaFields, boolean partitioned) throws Exception {
+    initPath();
+    initSparkContexts();
+    if (partitioned) {
+      initTestDataGenerator();
+    } else {
+      initTestDataGenerator(new String[] {""});
+    }
+    initFileSystem();
+    Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
+    props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
+    config = getConfigBuilder()
+        .withProperties(props)
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .fromProperties(props)
+            .withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withIndexKeyField("_row_key")
+            .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING)
+            .build())
+        .withAutoCommit(false)
+        .build();
+    writeClient = getHoodieWriteClient(config);
+    index = writeClient.getIndex();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupResources();
+  }
+
+  /**
+   * Test bucket index tagging (always tag regardless of the write status)
+   * Test bucket index tagging consistency, two tagging result should be same
+   *
+   * @param populateMetaFields
+   * @param partitioned
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testTagLocation(boolean populateMetaFields, boolean partitioned) throws Exception {
+    setUp(populateMetaFields, partitioned);
+    String newCommitTime = "001";
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
+
+    // The records should be tagged anyway, even though it is the first time doing tagging
+    List<HoodieRecord> taggedRecord = tagLocation(index, writeRecords, hoodieTable).collect();
+    Assertions.assertTrue(taggedRecord.stream().allMatch(r -> r.isCurrentLocationKnown()));
+
+    // Tag again, the records should get the same location (hashing metadata has been persisted after the first tagging)
+    List<HoodieRecord> taggedRecord2 = tagLocation(index, writeRecords, hoodieTable).collect();
+    for (HoodieRecord ref : taggedRecord) {
+      for (HoodieRecord record : taggedRecord2) {
+        if (ref.getRecordKey().equals(record.getRecordKey())) {
+          Assertions.assertEquals(ref.getCurrentLocation(), record.getCurrentLocation());
+          break;
+        }
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("configParams")
+  public void testWriteData(boolean populateMetaFields, boolean partitioned) throws Exception {
+    setUp(populateMetaFields, partitioned);
+    String newCommitTime = "001";
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Insert totalRecords records
+    writeClient.startCommitWithTime(newCommitTime);
+    List<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime).collect();
+    org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues);
+    Assertions.assertTrue(writeClient.commitStats(newCommitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType()));
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    // The number of distinct fileId should be the same as total log file numbers
+    Assertions.assertEquals(writeStatues.stream().map(WriteStatus::getFileId).distinct().count(),
+        Arrays.stream(dataGen.getPartitionPaths()).mapToInt(p -> Objects.requireNonNull(listStatus(p, true)).length).sum());

Review Comment:
   Let's also assert the records themselves, by reading those back 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a writer must have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed write.
+   *
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   *
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p);
+      ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) {
+    ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {

Review Comment:
   There's `isNullOrEmpty` utility specifically for that



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -86,6 +86,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap";
   public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat";
   public static final String METADATA_TABLE_FOLDER_PATH = METAFOLDER_NAME + Path.SEPARATOR + "metadata";
+  public static final String HASHING_METADATA_FOLER_NAME = ".hashing_metadata";

Review Comment:
   Shall we call this `.bucket_index` instead? "hashing_metadata" is too generic and doesn't really hint you at what it's all about



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException {

Review Comment:
   This seem to be identical to the overridden impl



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSparkConsistentBucketIndex.class);
+
+  private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
+
+  public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, HoodieEngineContext context, HoodieTable hoodieTable) throws HoodieIndexException {
+    return writeStatuses;
+  }
+
+  /**
+   * Do nothing.
+   * A failed write may create a hashing metadata for a partition. In this case, we still do nothing when rolling back
+   * the failed write. Because the hashing metadata created by a writer must have 00000000000000 timestamp and can be viewed
+   * as the initialization of a partition rather than as a part of the failed write.
+   *
+   * @param instantTime
+   * @return
+   */
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  /**
+   * Initialize bucket metadata for each partition
+   *
+   * @param table
+   * @param partitions partitions that need to be initialized
+   */
+  @Override
+  protected void initialize(HoodieTable table, List<String> partitions) {
+    partitionToIdentifier = new HashMap(partitions.size() + partitions.size() / 3);
+
+    // TODO maybe parallel
+    partitions.stream().forEach(p -> {
+      HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table, p);
+      ConsistentBucketIdentifier identifier = new ConsistentBucketIdentifier(metadata);
+      partitionToIdentifier.put(p, identifier);
+    });
+  }
+
+  /**
+   * Get bucket location for given key and partition
+   *
+   * @param key
+   * @param partitionPath
+   * @return
+   */
+  @Override
+  protected HoodieRecordLocation getBucket(HoodieKey key, String partitionPath) {
+    ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+    if (node.getFileIdPfx() != null && !node.getFileIdPfx().isEmpty()) {
+      /**
+       * Dynamic Bucket Index doesn't need the instant time of the latest file group.
+       * We add suffix 0 here to the file uuid, following the naming convention, i.e., fileId = [uuid]_[numWrites]
+       */
+      return new HoodieRecordLocation(null, FSUtils.createNewFileId(node.getFileIdPfx(), 0));
+    }
+
+    LOG.error("Consistent hashing node has no file group, partition: " + partitionPath + ", meta: "
+        + partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ", record_key: " + key.toString());
+    throw new HoodieIndexException("Failed to getBucket as hashing node has no file group");
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, create a new one (also persist it into storage)
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata
+   */
+  public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable table, String partition) {
+    HoodieConsistentHashingMetadata metadata = loadMetadata(table, partition);
+    if (metadata != null) {
+      return metadata;
+    }
+
+    // There is no metadata, so try to create a new one and save it.
+    metadata = new HoodieConsistentHashingMetadata(partition, numBuckets);
+    if (saveMetadata(table, metadata, false)) {
+      return metadata;
+    }
+
+    // The creation failed, so try load metadata again. Concurrent creation of metadata should have succeeded.
+    // Note: the consistent problem of cloud storage is handled internal in the HoodieWrapperFileSystem, i.e., ConsistentGuard
+    metadata = loadMetadata(table, partition);
+    ValidationUtils.checkState(metadata != null, "Failed to load or create metadata, partition: " + partition);
+    return metadata;
+  }
+
+  /**
+   * Load hashing metadata of the given partition, if it is not existed, return null
+   *
+   * @param table     hoodie table
+   * @param partition table partition
+   * @return Consistent hashing metadata or null if it does not exist
+   */
+  public static HoodieConsistentHashingMetadata loadMetadata(HoodieTable table, String partition) {
+    Path metadataPath = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), partition);
+
+    try {
+      if (!table.getMetaClient().getFs().exists(metadataPath)) {
+        return null;
+      }
+      FileStatus[] metaFiles = table.getMetaClient().getFs().listStatus(metadataPath);
+      final HoodieTimeline completedCommits = table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      Predicate<FileStatus> metaFilePredicate = fileStatus -> {
+        String filename = fileStatus.getPath().getName();
+        if (!filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX)) {
+          return false;
+        }
+        String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(filename);
+        return completedCommits.containsInstant(timestamp) || timestamp.equals(HoodieTimeline.INIT_INSTANT_TS);
+      };
+
+      // Get a valid hashing metadata with the largest (latest) timestamp
+      FileStatus metaFile = Arrays.stream(metaFiles).filter(metaFilePredicate)
+          .max(Comparator.comparing(a -> a.getPath().getName())).orElse(null);
+
+      if (metaFile == null) {
+        return null;
+      }
+
+      byte[] content = FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
+      return HoodieConsistentHashingMetadata.fromBytes(content);
+    } catch (IOException e) {
+      LOG.warn("Error when loading hashing metadata, partition: " + partition, e);
+      throw new HoodieIndexException("Error while loading hashing metadata", e);
+    }
+  }
+
+  /**
+   * Save metadata into storage
+   *
+   * @param table
+   * @param metadata
+   * @param overwrite
+   * @return
+   */
+  private static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMetadata metadata, boolean overwrite) {
+    FSDataOutputStream fsOut = null;
+    HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
+    Path dir = FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(), metadata.getPartitionPath());
+    Path fullPath = new Path(dir, metadata.getFilename());
+    try {
+      byte[] bytes = metadata.toBytes();
+      fsOut = fs.create(fullPath, overwrite);
+      fsOut.write(bytes);
+      fsOut.close();
+      return true;
+    } catch (IOException e) {
+      LOG.warn("Failed to update bucket metadata: " + metadata, e);
+    } finally {
+      try {
+        if (fsOut != null) {
+          fsOut.close();

Review Comment:
   You can do try-with-resources like following to avoid manual closing
   ```
   try (FSDataOutputStream fsOut = ...) { ... }
   ``` 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org