You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/05/18 01:32:35 UTC

[incubator-hudi] branch master updated: [HUDI-407] Adding Simple Index to Hoodie. (#1402)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 29edf4b  [HUDI-407] Adding Simple Index to Hoodie. (#1402)
29edf4b is described below

commit 29edf4b3b8ade64ec7822d6b7b2a125d5ca781c4
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sun May 17 21:32:24 2020 -0400

    [HUDI-407] Adding Simple Index to Hoodie. (#1402)
    
    This index finds the location by joining incoming records with records from base files.
---
 .../apache/hudi/client/utils/SparkConfigUtils.java |   4 +
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  46 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  16 +
 .../java/org/apache/hudi/index/HoodieIndex.java    |  15 +-
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  90 ++++
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  35 +-
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   |   7 +-
 .../hudi/index/simple/HoodieGlobalSimpleIndex.java | 169 +++++++
 .../hudi/index/simple/HoodieSimpleIndex.java       | 181 ++++++++
 .../hudi/io/HoodieKeyLocationFetchHandle.java      |  57 +++
 .../org/apache/hudi/index/TestHoodieIndex.java     | 510 +++++++++++++++++++--
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |   1 -
 .../hudi/io/TestHoodieKeyLocationFetchHandle.java  | 210 +++++++++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  18 +
 .../org/apache/hudi/common/util/ParquetUtils.java  |  52 ++-
 .../apache/hudi/common/util/TestParquetUtils.java  |  35 +-
 16 files changed, 1381 insertions(+), 65 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java b/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java
index 604be01..0a6b608 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/utils/SparkConfigUtils.java
@@ -99,4 +99,8 @@ public class SparkConfigUtils {
     String fraction = properties.getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, DEFAULT_MAX_MEMORY_FRACTION_FOR_COMPACTION);
     return getMaxMemoryAllowedForMerge(fraction);
   }
+
+  public static StorageLevel getSimpleIndexInputStorageLevel(Properties properties) {
+    return StorageLevel.fromString(properties.getProperty(HoodieIndexConfig.SIMPLE_INDEX_INPUT_STORAGE_LEVEL));
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index df2177e..4e974af 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -62,6 +62,12 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
   public static final String DEFAULT_BLOOM_INDEX_FILTER_TYPE = BloomFilterTypeCode.SIMPLE.name();
   public static final String HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "hoodie.bloom.index.filter.dynamic.max.entries";
   public static final String DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES = "100000";
+  public static final String SIMPLE_INDEX_USE_CACHING_PROP = "hoodie.simple.index.use.caching";
+  public static final String DEFAULT_SIMPLE_INDEX_USE_CACHING = "true";
+  public static final String SIMPLE_INDEX_PARALLELISM_PROP = "hoodie.simple.index.parallelism";
+  public static final String DEFAULT_SIMPLE_INDEX_PARALLELISM = "0";
+  public static final String GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP = "hoodie.global.simple.index.parallelism";
+  public static final String DEFAULT_GLOBAL_SIMPLE_INDEX_PARALLELISM = "0";
 
   // 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter.
   // 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions.
@@ -80,6 +86,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
 
   public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level";
   public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
+  public static final String SIMPLE_INDEX_INPUT_STORAGE_LEVEL = "hoodie.simple.index.input.storage.level";
+  public static final String DEFAULT_SIMPLE_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
 
   /**
    * Only applies if index type is GLOBAL_BLOOM.
@@ -92,6 +100,9 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
   public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = "hoodie.bloom.index.update.partition.path";
   public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = "false";
 
+  public static final String SIMPLE_INDEX_UPDATE_PARTITION_PATH = "hoodie.simple.index.update.partition.path";
+  public static final String DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH = "false";
+
   private HoodieIndexConfig(Properties props) {
     super(props);
   }
@@ -201,6 +212,31 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withSimpleIndexParallelism(int parallelism) {
+      props.setProperty(SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
+      return this;
+    }
+
+    public Builder simpleIndexUseCaching(boolean useCaching) {
+      props.setProperty(SIMPLE_INDEX_USE_CACHING_PROP, String.valueOf(useCaching));
+      return this;
+    }
+
+    public Builder withSimpleIndexInputStorageLevel(String level) {
+      props.setProperty(SIMPLE_INDEX_INPUT_STORAGE_LEVEL, level);
+      return this;
+    }
+
+    public Builder withGlobalSimpleIndexParallelism(int parallelism) {
+      props.setProperty(GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
+      return this;
+    }
+
+    public Builder withGlobalSimpleIndexUpdatePartitionPath(boolean updatePartitionPath) {
+      props.setProperty(SIMPLE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath));
+      return this;
+    }
+
     public HoodieIndexConfig build() {
       HoodieIndexConfig config = new HoodieIndexConfig(props);
       setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
@@ -228,6 +264,16 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
           BLOOM_INDEX_FILTER_TYPE, DEFAULT_BLOOM_INDEX_FILTER_TYPE);
       setDefaultOnCondition(props, !props.contains(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES),
           HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES, DEFAULT_HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES);
+      setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_PARALLELISM_PROP), SIMPLE_INDEX_PARALLELISM_PROP,
+          DEFAULT_SIMPLE_INDEX_PARALLELISM);
+      setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_USE_CACHING_PROP), SIMPLE_INDEX_USE_CACHING_PROP,
+          DEFAULT_SIMPLE_INDEX_USE_CACHING);
+      setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_INPUT_STORAGE_LEVEL), SIMPLE_INDEX_INPUT_STORAGE_LEVEL,
+          DEFAULT_SIMPLE_INDEX_INPUT_STORAGE_LEVEL);
+      setDefaultOnCondition(props, !props.containsKey(GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP), GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP,
+          DEFAULT_GLOBAL_SIMPLE_INDEX_PARALLELISM);
+      setDefaultOnCondition(props, !props.containsKey(SIMPLE_INDEX_UPDATE_PARTITION_PATH),
+          SIMPLE_INDEX_UPDATE_PARTITION_PATH, DEFAULT_SIMPLE_INDEX_UPDATE_PARTITION_PATH);
       // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
       HoodieIndex.IndexType.valueOf(props.getProperty(INDEX_TYPE_PROP));
       return config;
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 2d98edc..0467657 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -441,6 +441,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
   }
 
+  public int getSimpleIndexParallelism() {
+    return Integer.parseInt(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP));
+  }
+
+  public boolean getSimpleIndexUseCaching() {
+    return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_USE_CACHING_PROP));
+  }
+
+  public int getGlobalSimpleIndexParallelism() {
+    return Integer.parseInt(props.getProperty(HoodieIndexConfig.GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP));
+  }
+
+  public boolean getGlobalSimpleIndexUpdatePartitionPath() {
+    return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH));
+  }
+
   /**
    * storage properties.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
index 93fcc89..03a965a 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
@@ -32,6 +32,8 @@ import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.bloom.HoodieBloomIndex;
 import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
 import org.apache.hudi.index.hbase.HBaseIndex;
+import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
+import org.apache.hudi.index.simple.HoodieSimpleIndex;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.api.java.JavaPairRDD;
@@ -70,6 +72,10 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
         return new HoodieBloomIndex<>(config);
       case GLOBAL_BLOOM:
         return new HoodieGlobalBloomIndex<>(config);
+      case SIMPLE:
+        return new HoodieSimpleIndex<>(config);
+      case GLOBAL_SIMPLE:
+        return new HoodieGlobalSimpleIndex<>(config);
       default:
         throw new HoodieIndexException("Index type unspecified, set " + config.getIndexType());
     }
@@ -87,7 +93,7 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
    * present).
    */
   public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
-      HoodieTable<T> hoodieTable) throws HoodieIndexException;
+                                                       HoodieTable<T> hoodieTable) throws HoodieIndexException;
 
   /**
    * Extracts the location of written records, and updates the index.
@@ -95,7 +101,7 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
    * TODO(vc): We may need to propagate the record as well in a WriteStatus class
    */
   public abstract JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
-      HoodieTable<T> hoodieTable) throws HoodieIndexException;
+                                                      HoodieTable<T> hoodieTable) throws HoodieIndexException;
 
   /**
    * Rollback the efffects of the commit made at instantTime.
@@ -128,9 +134,10 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
   /**
    * Each index type should implement it's own logic to release any resources acquired during the process.
    */
-  public void close() {}
+  public void close() {
+  }
 
   public enum IndexType {
-    HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM
+    HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
new file mode 100644
index 0000000..806dcf5
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Hoodie Index Utilities.
+ */
+public class HoodieIndexUtils {
+
+  /**
+   * Fetches Pair of partition path and {@link HoodieBaseFile}s for interested partitions.
+   *
+   * @param partitions  list of partitions of interest
+   * @param jsc         instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} of interest
+   * @return the list of Pairs of partition path and fileId
+   */
+  public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
+                                                                                      final JavaSparkContext jsc,
+                                                                                      final HoodieTable hoodieTable) {
+    return jsc.parallelize(partitions, Math.max(partitions.size(), 1))
+        .flatMap(partitionPath -> {
+          Option<HoodieInstant> latestCommitTime = hoodieTable.getMetaClient().getCommitsTimeline()
+              .filterCompletedInstants().lastInstant();
+          List<Pair<String, HoodieBaseFile>> filteredFiles = new ArrayList<>();
+          if (latestCommitTime.isPresent()) {
+            filteredFiles = hoodieTable.getBaseFileOnlyView()
+                .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
+                .map(f -> Pair.of(partitionPath, f))
+                .collect(toList());
+          }
+          return filteredFiles.iterator();
+        })
+        .collect();
+  }
+
+  /**
+   * Get tagged record for the passed in {@link HoodieRecord}.
+   *
+   * @param inputRecord instance of {@link HoodieRecord} for which tagging is requested
+   * @param location    {@link HoodieRecordLocation} for the passed in {@link HoodieRecord}
+   * @return the tagged {@link HoodieRecord}
+   */
+  public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option<HoodieRecordLocation> location) {
+    HoodieRecord record = inputRecord;
+    if (location.isPresent()) {
+      // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD
+      // will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
+      // separate filenames that the record is found in. This will result in setting
+      // currentLocation 2 times and it will fail the second time. So creating a new in memory
+      // copy of the hoodie record.
+      record = new HoodieRecord<>(inputRecord);
+      record.unseal();
+      record.setCurrentLocation(location.get());
+      record.seal();
+    }
+    return record;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index 35ac526..ca960ef 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -24,12 +24,12 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.MetadataNotFoundException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.io.HoodieRangeInfoHandle;
 import org.apache.hudi.table.HoodieTable;
 
@@ -52,6 +52,7 @@ import scala.Tuple2;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
 
 /**
  * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata.
@@ -192,18 +193,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
                                                              final HoodieTable hoodieTable) {
 
     // Obtain the latest data files from all the partitions.
-    List<Pair<String, String>> partitionPathFileIDList =
-        jsc.parallelize(partitions, Math.max(partitions.size(), 1)).flatMap(partitionPath -> {
-          Option<HoodieInstant> latestCommitTime =
-              hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants().lastInstant();
-          List<Pair<String, String>> filteredFiles = new ArrayList<>();
-          if (latestCommitTime.isPresent()) {
-            filteredFiles = hoodieTable.getBaseFileOnlyView()
-                .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
-                .map(f -> Pair.of(partitionPath, f.getFileId())).collect(toList());
-          }
-          return filteredFiles.iterator();
-        }).collect();
+    List<Pair<String, String>> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
 
     if (config.getBloomIndexPruneByRanges()) {
       // also obtain file ranges, if range pruning is enabled
@@ -312,21 +304,6 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
             .collect(Collectors.toList()).iterator());
   }
 
-  HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord, Option<HoodieRecordLocation> location) {
-    HoodieRecord<T> record = inputRecord;
-    if (location.isPresent()) {
-      // When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD
-      // will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
-      // separate filenames that the record is found in. This will result in setting
-      // currentLocation 2 times and it will fail the second time. So creating a new in memory
-      // copy of the hoodie record.
-      record = new HoodieRecord<>(inputRecord);
-      record.unseal();
-      record.setCurrentLocation(location.get());
-      record.seal();
-    }
-    return record;
-  }
 
   /**
    * Tag the <rowKey, filename> back to the original HoodieRecord RDD.
@@ -338,7 +315,7 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
     // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
     // so we do left outer join.
     return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values()
-        .map(v1 -> getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
+        .map(v1 -> HoodieIndexUtils.getTaggedRecord(v1._1, Option.ofNullable(v1._2.orNull())));
   }
 
   @Override
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index a35d32f..1e57a38 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.spark.api.java.JavaPairRDD;
@@ -125,17 +126,17 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
           HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
               new EmptyHoodieRecordPayload());
           // Tag the incoming record for inserting to the new partition
-          HoodieRecord<T> taggedRecord = getTaggedRecord(hoodieRecord, Option.empty());
+          HoodieRecord<T> taggedRecord = HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty());
           return Arrays.asList(emptyRecord, taggedRecord).iterator();
         } else {
           // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
           // When it differs, the record will still be updated at its old partition.
           return Collections.singletonList(
-              getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
+              (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
                   Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
         }
       } else {
-        return Collections.singletonList(getTaggedRecord(hoodieRecord, Option.empty())).iterator();
+        return Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(hoodieRecord, Option.empty())).iterator();
       }
     });
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
new file mode 100644
index 0000000..bb1d8d6
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieGlobalSimpleIndex.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.simple;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
+
+/**
+ * A global simple index which reads interested fields(record key and partition path) from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param <T>
+ */
+public class HoodieGlobalSimpleIndex<T extends HoodieRecordPayload> extends HoodieSimpleIndex<T> {
+
+  public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
+                                              HoodieTable<T> hoodieTable) {
+    return tagLocationInternal(recordRDD, jsc, hoodieTable);
+  }
+
+  /**
+   * Tags records location for incoming records.
+   *
+   * @param inputRecordRDD   {@link JavaRDD} of incoming records
+   * @param jsc         instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} to use
+   * @return {@link JavaRDD} of records with record locations set
+   */
+  protected JavaRDD<HoodieRecord<T>> tagLocationInternal(JavaRDD<HoodieRecord<T>> inputRecordRDD, JavaSparkContext jsc,
+                                                         HoodieTable<T> hoodieTable) {
+
+    JavaPairRDD<String, HoodieRecord<T>> keyedInputRecordRDD = inputRecordRDD.mapToPair(entry -> new Tuple2<>(entry.getRecordKey(), entry));
+    JavaPairRDD<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable = fetchAllRecordLocations(jsc, hoodieTable,
+        config.getGlobalSimpleIndexParallelism());
+    return getTaggedRecords(keyedInputRecordRDD, allRecordLocationsInTable);
+  }
+
+  /**
+   * Fetch record locations for passed in {@link HoodieKey}s.
+   *
+   * @param jsc         instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} of interest
+   * @param parallelism parallelism to use
+   * @return {@link JavaPairRDD} of {@link HoodieKey} and {@link HoodieRecordLocation}
+   */
+  protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchAllRecordLocations(JavaSparkContext jsc,
+                                                                                 HoodieTable hoodieTable,
+                                                                                 int parallelism) {
+    List<Pair<String, HoodieBaseFile>> latestBaseFiles = getAllBaseFilesInTable(jsc, hoodieTable);
+    return fetchRecordLocations(jsc, hoodieTable, parallelism, latestBaseFiles);
+  }
+
+  /**
+   * Load all files for all partitions as <Partition, filename> pair RDD.
+   */
+  protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final JavaSparkContext jsc, final HoodieTable hoodieTable) {
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    try {
+      List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(metaClient.getFs(), metaClient.getBasePath(), config.shouldAssumeDatePartitioning());
+      // Obtain the latest data files from all the partitions.
+      return getLatestBaseFilesForAllPartitions(allPartitionPaths, jsc, hoodieTable);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to load all partitions", e);
+    }
+  }
+
+  /**
+   * Tag records with right {@link HoodieRecordLocation}.
+   *
+   * @param incomingRecords incoming {@link HoodieRecord}s
+   * @param existingRecords existing records with {@link HoodieRecordLocation}s
+   * @return {@link JavaRDD} of {@link HoodieRecord}s with tagged {@link HoodieRecordLocation}s
+   */
+  private JavaRDD<HoodieRecord<T>> getTaggedRecords(JavaPairRDD<String, HoodieRecord<T>> incomingRecords, JavaPairRDD<HoodieKey, HoodieRecordLocation> existingRecords) {
+    JavaPairRDD<String, Pair<String, HoodieRecordLocation>> existingRecordByRecordKey = existingRecords
+        .mapToPair(entry -> new Tuple2<>(entry._1.getRecordKey(), Pair.of(entry._1.getPartitionPath(), entry._2)));
+
+    return incomingRecords.leftOuterJoin(existingRecordByRecordKey).values()
+        .flatMap(entry -> {
+          HoodieRecord<T> inputRecord = entry._1;
+          Option<Pair<String, HoodieRecordLocation>> partitionPathLocationPair = Option.ofNullable(entry._2.orNull());
+          List<HoodieRecord<T>> taggedRecords;
+
+          if (partitionPathLocationPair.isPresent()) {
+            String partitionPath = partitionPathLocationPair.get().getKey();
+            HoodieRecordLocation location = partitionPathLocationPair.get().getRight();
+            if (config.getGlobalSimpleIndexUpdatePartitionPath() && !(inputRecord.getPartitionPath().equals(partitionPath))) {
+              // Create an empty record to delete the record in the old partition
+              HoodieRecord<T> emptyRecord = new HoodieRecord(new HoodieKey(inputRecord.getRecordKey(), partitionPath), new EmptyHoodieRecordPayload());
+              // Tag the incoming record for inserting to the new partition
+              HoodieRecord<T> taggedRecord = (HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty());
+              taggedRecords = Arrays.asList(emptyRecord, taggedRecord);
+            } else {
+              // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
+              // When it differs, the record will still be updated at its old partition.
+              HoodieRecord<T> newRecord = new HoodieRecord<>(new HoodieKey(inputRecord.getRecordKey(), partitionPath), inputRecord.getData());
+              taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(newRecord, Option.ofNullable(location)));
+            }
+          } else {
+            taggedRecords = Collections.singletonList((HoodieRecord<T>) HoodieIndexUtils.getTaggedRecord(inputRecord, Option.empty()));
+          }
+          return taggedRecords.iterator();
+        });
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option.Empty if the key is not.
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc         spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+                                                                                  JavaSparkContext jsc,
+                                                                                  HoodieTable<T> hoodieTable) {
+    return fetchRecordLocationInternal(hoodieKeys, jsc, hoodieTable, config.getGlobalSimpleIndexParallelism());
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return true;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
new file mode 100644
index 0000000..af963aa
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/index/simple/HoodieSimpleIndex.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.simple;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.List;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
+
+/**
+ * A simple index which reads interested fields(record key and partition path) from base files and
+ * joins with incoming records to find the tagged location.
+ *
+ * @param <T>
+ */
+public class HoodieSimpleIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
+
+  public HoodieSimpleIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
+                                             HoodieTable<T> hoodieTable) {
+    return writeStatusRDD;
+  }
+
+  @Override
+  public boolean rollbackCommit(String commitTime) {
+    return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return false;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+    return true;
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
+                                              HoodieTable<T> hoodieTable) {
+    return tagLocationInternal(recordRDD, jsc, hoodieTable);
+  }
+
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which contains it. Option. Empty if the key is not
+   * found.
+   *
+   * @param hoodieKeys  keys to lookup
+   * @param jsc         spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+                                                                                  JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
+
+    return fetchRecordLocationInternal(hoodieKeys, jsc, hoodieTable, config.getSimpleIndexParallelism());
+  }
+
+  /**
+   * Tags records location for incoming records.
+   *
+   * @param inputRecordRDD {@link JavaRDD} of incoming records
+   * @param jsc            instance of {@link JavaSparkContext} to use
+   * @param hoodieTable    instance of {@link HoodieTable} to use
+   * @return {@link JavaRDD} of records with record locations set
+   */
+  protected JavaRDD<HoodieRecord<T>> tagLocationInternal(JavaRDD<HoodieRecord<T>> inputRecordRDD, JavaSparkContext jsc,
+                                                         HoodieTable<T> hoodieTable) {
+    if (config.getSimpleIndexUseCaching()) {
+      inputRecordRDD.persist(SparkConfigUtils.getSimpleIndexInputStorageLevel(config.getProps()));
+    }
+
+    JavaPairRDD<HoodieKey, HoodieRecord<T>> keyedInputRecordRDD = inputRecordRDD.mapToPair(record -> new Tuple2<>(record.getKey(), record));
+    JavaPairRDD<HoodieKey, HoodieRecordLocation> existingLocationsOnTable = fetchRecordLocationsForAffectedPartitions(keyedInputRecordRDD.keys(), jsc, hoodieTable,
+        config.getSimpleIndexParallelism());
+
+    JavaRDD<HoodieRecord<T>> taggedRecordRDD = keyedInputRecordRDD.leftOuterJoin(existingLocationsOnTable)
+        .map(entry -> {
+          final HoodieRecord<T> untaggedRecord = entry._2._1;
+          final Option<HoodieRecordLocation> location = Option.ofNullable(entry._2._2.orNull());
+          return HoodieIndexUtils.getTaggedRecord(untaggedRecord, location);
+        });
+
+    if (config.getSimpleIndexUseCaching()) {
+      inputRecordRDD.unpersist();
+    }
+    return taggedRecordRDD;
+  }
+
+  /**
+   * Fetch record locations for passed in {@link JavaRDD} of HoodieKeys.
+   *
+   * @param lookupKeys  {@link JavaRDD} of {@link HoodieKey}s
+   * @param jsc         instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} of interest
+   * @param parallelism parallelism to use
+   * @return Hoodiekeys mapped to partitionpath and filenames
+   */
+  JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocationInternal(JavaRDD<HoodieKey> lookupKeys, JavaSparkContext jsc,
+                                                                                   HoodieTable<T> hoodieTable, int parallelism) {
+    JavaPairRDD<HoodieKey, Option<HoodieRecordLocation>> keyLocationsRDD = lookupKeys.mapToPair(key -> new Tuple2<>(key, Option.empty()));
+    JavaPairRDD<HoodieKey, HoodieRecordLocation> existingRecords = fetchRecordLocationsForAffectedPartitions(lookupKeys, jsc, hoodieTable, parallelism);
+
+    return keyLocationsRDD.leftOuterJoin(existingRecords)
+        .mapToPair(entry -> {
+          final Option<HoodieRecordLocation> locationOpt = Option.ofNullable(entry._2._2.orNull());
+          final HoodieKey key = entry._1;
+          return locationOpt
+              .map(location -> new Tuple2<>(key, Option.of(Pair.of(key.getPartitionPath(), location.getFileId()))))
+              .orElse(new Tuple2<>(key, Option.empty()));
+        });
+  }
+
+  /**
+   * Fetch record locations for passed in {@link HoodieKey}s.
+   *
+   * @param hoodieKeys  {@link JavaRDD} of {@link HoodieKey}s for which locations are fetched
+   * @param jsc         instance of {@link JavaSparkContext} to use
+   * @param hoodieTable instance of {@link HoodieTable} of interest
+   * @param parallelism parallelism to use
+   * @return {@link JavaPairRDD} of {@link HoodieKey} and {@link HoodieRecordLocation}
+   */
+  protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocationsForAffectedPartitions(JavaRDD<HoodieKey> hoodieKeys, JavaSparkContext jsc, HoodieTable<T> hoodieTable,
+                                                                                                   int parallelism) {
+    List<String> affectedPartitionPathList = hoodieKeys.map(HoodieKey::getPartitionPath).distinct().collect();
+    List<Pair<String, HoodieBaseFile>> latestBaseFiles = getLatestBaseFilesForAllPartitions(affectedPartitionPathList, jsc, hoodieTable);
+    return fetchRecordLocations(jsc, hoodieTable, parallelism, latestBaseFiles);
+  }
+
+  protected JavaPairRDD<HoodieKey, HoodieRecordLocation> fetchRecordLocations(JavaSparkContext jsc, HoodieTable<T> hoodieTable, int parallelism, List<Pair<String, HoodieBaseFile>> baseFiles) {
+    int fetchParallelism = Math.max(1, Math.max(baseFiles.size(), parallelism));
+    return jsc.parallelize(baseFiles, fetchParallelism)
+        .flatMapToPair(partitionPathBaseFile -> new HoodieKeyLocationFetchHandle(config, hoodieTable, partitionPathBaseFile).locations());
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
new file mode 100644
index 0000000..3aa1398
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Iterator;
+
+import scala.Tuple2;
+
+/**
+ * {@link HoodieRecordLocation} fetch handle for all records from {@link HoodieBaseFile} of interest.
+ *
+ * @param <T>
+ */
+public class HoodieKeyLocationFetchHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> {
+
+  private final Pair<String, HoodieBaseFile> partitionPathBaseFilePair;
+
+  public HoodieKeyLocationFetchHandle(HoodieWriteConfig config, HoodieTable<T> hoodieTable,
+                                      Pair<String, HoodieBaseFile> partitionPathBaseFilePair) {
+    super(config, null, hoodieTable, Pair.of(partitionPathBaseFilePair.getLeft(), partitionPathBaseFilePair.getRight().getFileId()));
+    this.partitionPathBaseFilePair = partitionPathBaseFilePair;
+  }
+
+  public Iterator<Tuple2<HoodieKey, HoodieRecordLocation>> locations() {
+    HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
+    return ParquetUtils.fetchRecordKeyPartitionPathFromParquet(hoodieTable.getHadoopConf(), new Path(baseFile.getPath())).stream()
+        .map(entry -> new Tuple2<>(entry,
+            new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()))).iterator();
+  }
+}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 9e93da3..58a7031 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -18,15 +18,30 @@
 
 package org.apache.hudi.index;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.HoodieClientTestHarness;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieHBaseIndexConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIndexException;
@@ -34,63 +49,126 @@ import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.index.bloom.HoodieBloomIndex;
 import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
 import org.apache.hudi.index.hbase.HBaseIndex;
+import org.apache.hudi.index.simple.HoodieSimpleIndex;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import scala.Tuple2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieIndex extends HoodieClientTestHarness {
 
-  private HoodieWriteConfig.Builder clientConfigBuilder;
-  private HoodieIndexConfig.Builder indexConfigBuilder;
+  private final Random random = new Random();
+  private IndexType indexType;
+  private HoodieIndex index;
+  private HoodieWriteConfig config;
+  private HoodieWriteClient writeClient;
+  private String schemaStr;
+  private Schema schema;
+
+  private void setUp(IndexType indexType) throws Exception {
+    setUp(indexType, true);
+  }
 
-  @BeforeEach
-  public void setUp() throws Exception {
+  private void setUp(IndexType indexType, boolean initializeIndex) throws Exception {
+    this.indexType = indexType;
     initSparkContexts("TestHoodieIndex");
     initPath();
+    initTestDataGenerator();
+    initFileSystem();
+    // We have some records to be tagged (two different partitions)
+    schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
+    schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
     initMetaClient();
-    clientConfigBuilder = HoodieWriteConfig.newBuilder();
-    indexConfigBuilder = HoodieIndexConfig.newBuilder();
+    if (initializeIndex) {
+      instantiateIndex();
+    }
   }
 
   @AfterEach
-  public void tearDown() {
+  public void tearDown() throws IOException {
     cleanupSparkContexts();
+    cleanupFileSystem();
     cleanupMetaClient();
   }
 
-  @Test
-  public void testCreateIndex() {
-    // Different types
-    HoodieWriteConfig config = clientConfigBuilder.withPath(basePath)
-        .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE)
-            .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build())
-        .build();
-    assertTrue(HoodieIndex.createIndex(config) instanceof HBaseIndex);
-    config = clientConfigBuilder.withPath(basePath)
-        .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
-    assertTrue(HoodieIndex.createIndex(config) instanceof InMemoryHashIndex);
-    config = clientConfigBuilder.withPath(basePath)
-        .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
-    assertTrue(HoodieIndex.createIndex(config) instanceof HoodieBloomIndex);
-    config = clientConfigBuilder.withPath(basePath)
-        .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
-    assertTrue(HoodieIndex.createIndex(config) instanceof HoodieGlobalBloomIndex);
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE", "HBASE"})
+  public void testCreateIndex(IndexType indexType) throws Exception {
+    setUp(indexType, false);
+    HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder();
+    HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder();
+    switch (indexType) {
+      case INMEMORY:
+        config = clientConfigBuilder.withPath(basePath)
+            .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
+        assertTrue(HoodieIndex.createIndex(config) instanceof InMemoryHashIndex);
+        break;
+      case BLOOM:
+        config = clientConfigBuilder.withPath(basePath)
+            .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
+        assertTrue(HoodieIndex.createIndex(config) instanceof HoodieBloomIndex);
+        break;
+      case GLOBAL_BLOOM:
+        config = clientConfigBuilder.withPath(basePath)
+            .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.GLOBAL_BLOOM).build()).build();
+        assertTrue(HoodieIndex.createIndex(config) instanceof HoodieGlobalBloomIndex);
+        break;
+      case SIMPLE:
+        config = clientConfigBuilder.withPath(basePath)
+            .withIndexConfig(indexConfigBuilder.withIndexType(IndexType.SIMPLE).build()).build();
+        assertTrue(HoodieIndex.createIndex(config) instanceof HoodieSimpleIndex);
+        break;
+      case HBASE:
+        config = clientConfigBuilder.withPath(basePath)
+            .withIndexConfig(indexConfigBuilder.withIndexType(HoodieIndex.IndexType.HBASE)
+                .withHBaseIndexConfig(new HoodieHBaseIndexConfig.Builder().build()).build())
+            .build();
+        assertTrue(HoodieIndex.createIndex(config) instanceof HBaseIndex);
+        break;
+      default:
+        // no -op. just for checkstyle errors
+    }
+  }
 
+  @Test
+  public void testCreateDummyIndex() throws Exception {
+    setUp(IndexType.BLOOM, false);
+    HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder();
+    HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder();
     config = clientConfigBuilder.withPath(basePath)
         .withIndexConfig(indexConfigBuilder.withIndexClass(DummyHoodieIndex.class.getName()).build()).build();
     assertTrue(HoodieIndex.createIndex(config) instanceof DummyHoodieIndex);
   }
 
   @Test
-  public void testCreateIndex_withException() {
+  public void testCreateIndex_withException() throws Exception {
+    setUp(IndexType.BLOOM, false);
+    HoodieWriteConfig.Builder clientConfigBuilder = HoodieWriteConfig.newBuilder();
+    HoodieIndexConfig.Builder indexConfigBuilder = HoodieIndexConfig.newBuilder();
     final HoodieWriteConfig config1 = clientConfigBuilder.withPath(basePath)
         .withIndexConfig(indexConfigBuilder.withIndexClass(IndexWithConstructor.class.getName()).build()).build();
     final Throwable thrown1 = assertThrows(HoodieException.class, () -> {
@@ -106,6 +184,385 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
     assertTrue(thrown2.getMessage().contains("Unable to instantiate class"));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})
+  public void testSimpleTagLocationAndUpdate(IndexType indexType) throws Exception {
+    setUp(indexType);
+    String newCommitTime = "001";
+    int totalRecords = 10 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+
+    // Test tagLocation without any entries in index
+    JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+    assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+
+    // Insert totalRecords records
+    writeClient.startCommitWithTime(newCommitTime);
+    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+    assertNoWriteErrors(writeStatues.collect());
+
+    // Now tagLocation for these records, index should not tag them since it was a failed
+    // commit
+    javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+    assert (javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 0);
+    // Now commit this & update location of records inserted and validate no errors
+    writeClient.commit(newCommitTime, writeStatues);
+    // Now tagLocation for these records, index should tag them correctly
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+    javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+    Map<String, String> recordKeyToPartitionPathMap = new HashMap();
+    List<HoodieRecord> hoodieRecords = writeRecords.collect();
+    hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
+
+    assertEquals(totalRecords, javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size());
+    assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
+    assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null
+        && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+    javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch"));
+
+    JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey());
+    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable);
+    List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
+    assertEquals(totalRecords, recordLocations.collect().size());
+    assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count());
+    recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey"));
+    recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})
+  public void testTagLocationAndDuplicateUpdate(IndexType indexType) throws Exception {
+    setUp(indexType);
+    String newCommitTime = "001";
+    int totalRecords = 10 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+
+    writeClient.startCommitWithTime(newCommitTime);
+    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+    JavaRDD<HoodieRecord> javaRDD1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+
+    // Duplicate upsert and ensure correctness is maintained
+    // We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
+    // recomputed. This includes the state transitions. We need to delete the inflight instance so that subsequent
+    // upsert will not run into conflicts.
+    metaClient.getFs().delete(new Path(metaClient.getMetaPath(), "001.inflight"));
+
+    writeClient.upsert(writeRecords, newCommitTime);
+    assertNoWriteErrors(writeStatues.collect());
+
+    // Now commit this & update location of records inserted and validate no errors
+    writeClient.commit(newCommitTime, writeStatues);
+    // Now tagLocation for these records, hbaseIndex should tag them correctly
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+    JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+
+    Map<String, String> recordKeyToPartitionPathMap = new HashMap();
+    List<HoodieRecord> hoodieRecords = writeRecords.collect();
+    hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
+
+    assertEquals(totalRecords, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
+    assertEquals(totalRecords, javaRDD.map(record -> record.getKey().getRecordKey()).distinct().count());
+    assertEquals(totalRecords, javaRDD.filter(record -> (record.getCurrentLocation() != null
+        && record.getCurrentLocation().getInstantTime().equals(newCommitTime))).distinct().count());
+    javaRDD.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry.getRecordKey()), entry.getPartitionPath(), "PartitionPath mismatch"));
+
+    JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey());
+    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable);
+    List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
+    assertEquals(totalRecords, recordLocations.collect().size());
+    assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count());
+    recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey"));
+    recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"BLOOM", "GLOBAL_BLOOM", "SIMPLE", "GLOBAL_SIMPLE"})
+  public void testSimpleTagLocationAndUpdateWithRollback(IndexType indexType) throws Exception {
+    setUp(indexType);
+    String newCommitTime = writeClient.startCommit();
+    int totalRecords = 20 + random.nextInt(20);
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, totalRecords);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Insert 200 records
+    JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
+    assertNoWriteErrors(writeStatues.collect());
+
+    // commit this upsert
+    writeClient.commit(newCommitTime, writeStatues);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+
+    // Now tagLocation for these records, hbaseIndex should tag them
+    JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+    assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == totalRecords);
+
+    // check tagged records are tagged with correct fileIds
+    List<String> fileIds = writeStatues.map(WriteStatus::getFileId).collect();
+    assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
+    List<String> taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
+
+    Map<String, String> recordKeyToPartitionPathMap = new HashMap();
+    List<HoodieRecord> hoodieRecords = writeRecords.collect();
+    hoodieRecords.forEach(entry -> recordKeyToPartitionPathMap.put(entry.getRecordKey(), entry.getPartitionPath()));
+
+    JavaRDD<HoodieKey> hoodieKeyJavaRDD = writeRecords.map(entry -> entry.getKey());
+    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(hoodieKeyJavaRDD, jsc, hoodieTable);
+    List<HoodieKey> hoodieKeys = hoodieKeyJavaRDD.collect();
+    assertEquals(totalRecords, recordLocations.collect().size());
+    assertEquals(totalRecords, recordLocations.map(record -> record._1).distinct().count());
+    recordLocations.foreach(entry -> assertTrue(hoodieKeys.contains(entry._1), "Missing HoodieKey"));
+    recordLocations.foreach(entry -> assertEquals(recordKeyToPartitionPathMap.get(entry._1.getRecordKey()), entry._1.getPartitionPath(), "PartitionPath mismatch"));
+
+    // both lists should match
+    assertTrue(taggedFileIds.containsAll(fileIds) && fileIds.containsAll(taggedFileIds));
+    // Rollback the last commit
+    writeClient.rollback(newCommitTime);
+
+    hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+    // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
+    // back commit
+    javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
+    assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == 0);
+    assert (javaRDD.filter(record -> record.getCurrentLocation() != null).collect().size() == 0);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"BLOOM", "SIMPLE",})
+  public void testTagLocationAndFetchRecordLocations(IndexType indexType) throws Exception {
+    setUp(indexType);
+    String rowKey1 = UUID.randomUUID().toString();
+    String rowKey2 = UUID.randomUUID().toString();
+    String rowKey3 = UUID.randomUUID().toString();
+    String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+    String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+    String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+    // place same row key under a different partition.
+    String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}";
+    TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
+    HoodieRecord record1 =
+        new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1);
+    TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
+    HoodieRecord record2 =
+        new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2);
+    TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
+    HoodieRecord record3 =
+        new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3);
+    TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4);
+    HoodieRecord record4 =
+        new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4);
+    JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));
+
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+
+    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable);
+
+    // Should not find any files
+    for (HoodieRecord record : taggedRecordRDD.collect()) {
+      assertFalse(record.isCurrentLocationKnown());
+    }
+
+    // We create three parquet file, each having one record. (two different partitions)
+    String filename1 =
+        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record1), schema, null, true);
+    String filename2 =
+        HoodieClientTestUtils.writeParquetFile(basePath, "2016/01/31", Collections.singletonList(record2), schema, null, true);
+    String filename3 =
+        HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Collections.singletonList(record4), schema, null, true);
+
+    // We do the tag again
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+
+    taggedRecordRDD = index.tagLocation(recordRDD, jsc, hoodieTable);
+
+    // Check results
+    for (HoodieRecord record : taggedRecordRDD.collect()) {
+      if (record.getRecordKey().equals(rowKey1)) {
+        if (record.getPartitionPath().equals("2015/01/31")) {
+          assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename3));
+        } else {
+          assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename1));
+        }
+      } else if (record.getRecordKey().equals(rowKey2)) {
+        assertEquals(record.getCurrentLocation().getFileId(), FSUtils.getFileId(filename2));
+      } else if (record.getRecordKey().equals(rowKey3)) {
+        assertFalse(record.isCurrentLocationKnown());
+      }
+    }
+
+    JavaPairRDD<HoodieKey, Option<Pair<String, String>>> recordLocations = index.fetchRecordLocation(recordRDD.map(entry -> entry.getKey()), jsc, hoodieTable);
+
+    for (Tuple2<HoodieKey, Option<Pair<String, String>>> entry : recordLocations.collect()) {
+      if (entry._1.getRecordKey().equals(rowKey1)) {
+        assertTrue(entry._2.isPresent(), "Row1 should have been present ");
+        if (entry._1.getPartitionPath().equals("2015/01/31")) {
+          assertTrue(entry._2.isPresent(), "Row1 should have been present ");
+          assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename3));
+        } else {
+          assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename1));
+        }
+      } else if (entry._1.getRecordKey().equals(rowKey2)) {
+        assertTrue(entry._2.isPresent(), "Row2 should have been present ");
+        assertEquals(entry._2.get().getRight(), FSUtils.getFileId(filename2));
+      } else if (entry._1.getRecordKey().equals(rowKey3)) {
+        assertFalse(entry._2.isPresent(), "Row3 should have been absent ");
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = IndexType.class, names = {"GLOBAL_SIMPLE"})
+  public void testSimpleGlobalIndexTagLocationWhenShouldUpdatePartitionPath(IndexType indexType) throws Exception {
+    setUp(indexType);
+    config = getConfigBuilder()
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .withGlobalSimpleIndexUpdatePartitionPath(true)
+            .withBloomIndexUpdatePartitionPath(true)
+            .build()).build();
+    writeClient = getHoodieWriteClient(config);
+    index = writeClient.getIndex();
+
+    // Create the original partition, and put a record, along with the meta file
+    // "2016/01/31": 1 file (1_0_20160131101010.parquet)
+    new File(basePath + "/2016/01/31").mkdirs();
+    new File(basePath + "/2016/01/31/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile();
+
+    // this record will be saved in table and will be tagged to an empty record
+    TestRawTripPayload originalPayload =
+        new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}");
+    HoodieRecord originalRecord =
+        new HoodieRecord(new HoodieKey(originalPayload.getRowKey(), originalPayload.getPartitionPath()),
+            originalPayload);
+
+    /*
+    This record has the same record key as originalRecord but different time so different partition
+    Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+    globalBloomIndex should
+    - tag the original partition of the originalRecord to an empty record for deletion, and
+    - tag the new partition of the incomingRecord
+    */
+    TestRawTripPayload incomingPayload =
+        new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}");
+    HoodieRecord incomingRecord =
+        new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()),
+            incomingPayload);
+    /*
+    This record has the same record key as originalRecord and the same partition
+    Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true,
+    globalBloomIndex should just tag the original partition
+    */
+    TestRawTripPayload incomingPayloadSamePartition =
+        new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}");
+    HoodieRecord incomingRecordSamePartition =
+        new HoodieRecord(
+            new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()),
+            incomingPayloadSamePartition);
+
+    // We have some records to be tagged (two different partitions)
+    String schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
+    Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
+
+    HoodieClientTestUtils
+        .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+
+    // Add some commits
+    new File(basePath + "/.hoodie").mkdirs();
+
+    // test against incoming record with a different partition
+    JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
+    JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, jsc, table);
+
+    assertEquals(2, taggedRecordRDD.count());
+    for (HoodieRecord record : taggedRecordRDD.collect()) {
+      switch (record.getPartitionPath()) {
+        case "2016/01/31":
+          assertEquals("000", record.getRecordKey());
+          assertTrue(record.getData() instanceof EmptyHoodieRecordPayload);
+          break;
+        case "2016/02/31":
+          assertEquals("000", record.getRecordKey());
+          assertEquals(incomingPayload.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData());
+          break;
+        default:
+          assertFalse(true, String.format("Should not get partition path: %s", record.getPartitionPath()));
+      }
+    }
+
+    // test against incoming record with the same partition
+    JavaRDD<HoodieRecord> recordRDDSamePartition = jsc
+        .parallelize(Collections.singletonList(incomingRecordSamePartition));
+    JavaRDD<HoodieRecord> taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table);
+
+    assertEquals(1, taggedRecordRDDSamePartition.count());
+    HoodieRecord record = taggedRecordRDDSamePartition.first();
+    assertEquals("000", record.getRecordKey());
+    assertEquals("2016/01/31", record.getPartitionPath());
+    assertEquals(incomingPayloadSamePartition.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData());
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  public HoodieWriteConfig.Builder getConfigBuilder() {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    return getConfigBuilder(schemaStr, indexType);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+        .withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .forTable("test-trip-table")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+
+  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+    return new HoodieWriteClient(jsc, cfg, false);
+  }
+
+  private void instantiateIndex() {
+    config = getConfigBuilder()
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
+            .build()).withAutoCommit(false).build();
+    writeClient = getHoodieWriteClient(config);
+    this.index = writeClient.getIndex();
+  }
+
+  private void assertNoWriteErrors(List<WriteStatus> statuses) {
+    // Verify there are no errors
+    for (WriteStatus status : statuses) {
+      assertFalse(status.hasErrors());
+    }
+  }
+
   public static class DummyHoodieIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
 
     public DummyHoodieIndex(HoodieWriteConfig config) {
@@ -157,4 +614,5 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
   public static class IndexWithoutConstructor {
 
   }
+
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 8b93828..5a85f9c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -475,5 +475,4 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
       }
     }
   }
-
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
new file mode 100644
index 0000000..50e2127
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.HoodieClientTestHarness;
+import org.apache.hudi.common.HoodieClientTestUtils;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import scala.Tuple2;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.common.HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests {@link HoodieKeyLocationFetchHandle}.
+ */
+public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
+
+  private HoodieWriteConfig config;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestRecordFetcher");
+    initPath();
+    initTestDataGenerator();
+    initFileSystem();
+    initMetaClient();
+    config = getConfigBuilder()
+        .withIndexConfig(HoodieIndexConfig.newBuilder()
+            .build()).build();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupSparkContexts();
+    cleanupFileSystem();
+    cleanupMetaClient();
+  }
+
+  @Test
+  public void testFetchHandle() throws Exception {
+
+    String commitTime = "000";
+    List<HoodieRecord> records = dataGen.generateInserts(commitTime, 100);
+    Map<String, List<HoodieRecord>> recordsPerPartiton = getRecordsPerPartition(records);
+
+    Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> expectedList = writeToParquetAndGetExpectedRecordLocations(recordsPerPartiton);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable hoodieTable = HoodieTable.create(metaClient, config, jsc.hadoopConfiguration());
+
+    Files.createDirectories(Paths.get(basePath, ".hoodie"));
+
+    List<Tuple2<String, HoodieBaseFile>> partitionPathFileIdPairs = loadAllFilesForPartitions(new ArrayList<>(recordsPerPartiton.keySet()), jsc, hoodieTable);
+
+    for (Tuple2<String, HoodieBaseFile> entry : partitionPathFileIdPairs) {
+      HoodieKeyLocationFetchHandle fetcherHandle = new HoodieKeyLocationFetchHandle(config, hoodieTable, Pair.of(entry._1, entry._2));
+      Iterator<Tuple2<HoodieKey, HoodieRecordLocation>> result = fetcherHandle.locations();
+      List<Tuple2<HoodieKey, HoodieRecordLocation>> actualList = new ArrayList<>();
+      result.forEachRemaining(actualList::add);
+      assertEquals(expectedList.get(new Tuple2<>(entry._1, entry._2.getFileId())), actualList);
+    }
+  }
+
+  private Map<String, List<HoodieRecord>> getRecordsPerPartition(List<HoodieRecord> records) {
+    Map<String, List<HoodieRecord>> recordsPerPartiton = new HashMap<>();
+    for (HoodieRecord record : records) {
+      if (!recordsPerPartiton.containsKey(record.getPartitionPath())) {
+        recordsPerPartiton.put(record.getPartitionPath(), new ArrayList<>());
+      }
+      recordsPerPartiton.get(record.getPartitionPath()).add(record);
+    }
+    return recordsPerPartiton;
+  }
+
+  private Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> writeToParquetAndGetExpectedRecordLocations(
+      Map<String, List<HoodieRecord>> recordsPerPartiton) throws Exception {
+    Map<Tuple2<String, String>, List<Tuple2<HoodieKey, HoodieRecordLocation>>> expectedList = new HashMap<>();
+    for (Map.Entry<String, List<HoodieRecord>> entry : recordsPerPartiton.entrySet()) {
+      int totalRecordsPerPartition = entry.getValue().size();
+      int totalSlices = 1;
+      if (totalRecordsPerPartition > 5) {
+        totalSlices = totalRecordsPerPartition / 3;
+      }
+      int recordsPerFileSlice = totalRecordsPerPartition / totalSlices;
+
+      List<List<HoodieRecord>> recordsForFileSlices = new ArrayList<>();
+      recordsForFileSlices.add(new ArrayList<>());
+      int index = 0;
+      int count = 0;
+      for (HoodieRecord record : entry.getValue()) {
+        if (count < recordsPerFileSlice) {
+          recordsForFileSlices.get(index).add(record);
+          count++;
+        } else {
+          recordsForFileSlices.add(new ArrayList<>());
+          index++;
+          count = 0;
+        }
+      }
+
+      for (List<HoodieRecord> recordsPerSlice : recordsForFileSlices) {
+        Tuple2<String, String> fileIdInstantTimePair = writeToParquet(entry.getKey(), recordsPerSlice);
+        List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new ArrayList<>();
+        for (HoodieRecord record : recordsPerSlice) {
+          expectedEntries.add(new Tuple2<>(record.getKey(), new HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1)));
+        }
+        expectedList.put(new Tuple2<>(entry.getKey(), fileIdInstantTimePair._1), expectedEntries);
+      }
+    }
+    return expectedList;
+  }
+
+  protected List<Tuple2<String, HoodieBaseFile>> loadAllFilesForPartitions(List<String> partitions, final JavaSparkContext jsc,
+                                                                           final HoodieTable hoodieTable) {
+
+    // Obtain the latest data files from all the partitions.
+    List<Pair<String, HoodieBaseFile>> partitionPathFileIDList = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, jsc, hoodieTable);
+    return partitionPathFileIDList.stream()
+        .map(pf -> new Tuple2<>(pf.getKey(), pf.getValue())).collect(toList());
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  public HoodieWriteConfig.Builder getConfigBuilder() {
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Get Config builder with default configs set.
+   *
+   * @return Config Builder
+   */
+  private HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+        .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+        .withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+        .forTable("test-trip-table")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().build())
+        .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+  }
+
+  private Tuple2<String, String> writeToParquet(String partitionPath, List<HoodieRecord> records) throws Exception {
+    Thread.sleep(100);
+    String instantTime = HoodieTestUtils.makeNewCommitTime();
+    String fileId = UUID.randomUUID().toString();
+    String filename = FSUtils.makeDataFileName(instantTime, "1-0-1", fileId);
+    HoodieTestUtils.createCommitFiles(basePath, instantTime);
+    HoodieClientTestUtils.writeParquetFile(basePath, partitionPath, filename, records, AVRO_SCHEMA_WITH_METADATA_FIELDS, null,
+        true);
+    return new Tuple2<>(fileId, instantTime);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index bffe8df..8c22122 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -198,6 +198,24 @@ public class HoodieAvroUtils {
     return RECORD_KEY_SCHEMA;
   }
 
+  /**
+   * Fetch schema for record key and partition path.
+   */
+  public static Schema getRecordKeyPartitionPathSchema() {
+    List<Schema.Field> toBeAddedFields = new ArrayList<>();
+    Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", false);
+
+    Schema.Field recordKeyField =
+        new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+    Schema.Field partitionPathField =
+        new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+
+    toBeAddedFields.add(recordKeyField);
+    toBeAddedFields.add(partitionPathField);
+    recordSchema.setFields(toBeAddedFields);
+    return recordSchema;
+  }
+
   public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String recordKey, String partitionPath,
       String fileName) {
     record.put(HoodieRecord.FILENAME_METADATA_FIELD, fileName);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index 923b174..3c8d9e8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -76,13 +77,27 @@ public class ParquetUtils {
    * @return Set Set of row keys matching candidateRecordKeys
    */
   public static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter) {
+    return filterParquetRowKeys(configuration, filePath, filter, HoodieAvroUtils.getRecordKeySchema());
+  }
+
+  /**
+   * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
+   * return all the rowkeys.
+   *
+   * @param filePath      The parquet file path.
+   * @param configuration configuration to build fs object
+   * @param filter        record keys filter
+   * @param readSchema    schema of columns to be read
+   * @return Set Set of row keys matching candidateRecordKeys
+   */
+  private static Set<String> filterParquetRowKeys(Configuration configuration, Path filePath, Set<String> filter,
+                                                  Schema readSchema) {
     Option<RecordKeysFilterFunction> filterFunction = Option.empty();
     if (filter != null && !filter.isEmpty()) {
       filterFunction = Option.of(new RecordKeysFilterFunction(filter));
     }
     Configuration conf = new Configuration(configuration);
     conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
-    Schema readSchema = HoodieAvroUtils.getRecordKeySchema();
     AvroReadSupport.setAvroReadSchema(conf, readSchema);
     AvroReadSupport.setRequestedProjection(conf, readSchema);
     Set<String> rowKeys = new HashSet<>();
@@ -105,6 +120,41 @@ public class ParquetUtils {
     return rowKeys;
   }
 
+  /**
+   * Fetch {@link HoodieKey}s from the given parquet file.
+   *
+   * @param filePath      The parquet file path.
+   * @param configuration configuration to build fs object
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  public static List<HoodieKey> fetchRecordKeyPartitionPathFromParquet(Configuration configuration, Path filePath) {
+    List<HoodieKey> hoodieKeys = new ArrayList<>();
+    try {
+      if (!filePath.getFileSystem(configuration).exists(filePath)) {
+        return new ArrayList<>();
+      }
+
+      Configuration conf = new Configuration(configuration);
+      conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
+      Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
+      AvroReadSupport.setAvroReadSchema(conf, readSchema);
+      AvroReadSupport.setRequestedProjection(conf, readSchema);
+      ParquetReader reader = AvroParquetReader.builder(filePath).withConf(conf).build();
+      Object obj = reader.read();
+      while (obj != null) {
+        if (obj instanceof GenericRecord) {
+          String recordKey = ((GenericRecord) obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+          String partitionPath = ((GenericRecord) obj).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+          hoodieKeys.add(new HoodieKey(recordKey, partitionPath));
+          obj = reader.read();
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read from Parquet file " + filePath, e);
+    }
+    return hoodieKeys;
+  }
+
   public static ParquetMetadata readMetadata(Configuration conf, Path parquetFilePath) {
     ParquetMetadata footer;
     try {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index 2d2084d..15b1602 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
@@ -120,9 +121,38 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("bloomFilterTypeCodes")
+  public void testFetchRecordKeyPartitionPathFromParquet(String typeCode) throws Exception {
+    List<String> rowKeys = new ArrayList<>();
+    List<HoodieKey> expected = new ArrayList<>();
+    String partitionPath = "path1";
+    for (int i = 0; i < 1000; i++) {
+      String rowKey = UUID.randomUUID().toString();
+      rowKeys.add(rowKey);
+      expected.add(new HoodieKey(rowKey, partitionPath));
+    }
+
+    String filePath = basePath + "/test.parquet";
+    Schema schema = HoodieAvroUtils.getRecordKeyPartitionPathSchema();
+    writeParquetFile(typeCode, filePath, rowKeys, schema, true, partitionPath);
+
+    // Read and verify
+    List<HoodieKey> fetchedRows =
+        ParquetUtils.fetchRecordKeyPartitionPathFromParquet(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath));
+    assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not match");
+
+    for (HoodieKey entry : fetchedRows) {
+      assertTrue(expected.contains(entry), "Record key must be in the given filter");
+    }
+  }
+
   private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys) throws Exception {
+    writeParquetFile(typeCode, filePath, rowKeys, HoodieAvroUtils.getRecordKeySchema(), false, "");
+  }
+
+  private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys, Schema schema, boolean addPartitionPathField, String partitionPath) throws Exception {
     // Write out a parquet file
-    Schema schema = HoodieAvroUtils.getRecordKeySchema();
     BloomFilter filter = BloomFilterFactory
         .createBloomFilter(1000, 0.0001, 10000, typeCode);
     HoodieAvroWriteSupport writeSupport =
@@ -132,6 +162,9 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
     for (String rowKey : rowKeys) {
       GenericRecord rec = new GenericData.Record(schema);
       rec.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, rowKey);
+      if (addPartitionPathField) {
+        rec.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
+      }
       writer.write(rec);
       writeSupport.add(rowKey);
     }