You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/16 00:33:14 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4693: [WIP][HUDI-3175][RFC-45] Implement async metadata indexing

nsivabalan commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r827496666



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. false otherwise.
    */
   private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
+        commit(instantTime, partitionRecordsMap, canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to be) written
+    Option<HoodieInstant> lastIndexingInstant = dataMetaClient.getActiveTimeline()

Review comment:
       guess we have to fix this to read from table Properties ?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. false otherwise.
    */
   private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
+        commit(instantTime, partitionRecordsMap, canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to be) written
+    Option<HoodieInstant> lastIndexingInstant = dataMetaClient.getActiveTimeline()
+        .getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to FileSystem.exists() check.");
+        return getExistingMetadataPartitions();
+      }
     }
+    // TODO: return only enabled partitions
+    return MetadataPartitionType.allPaths();

Review comment:
       why we return all partitions? what incase of the following:
   1. if someone migrated to 0.11 from 0.10. but files partition was already present.
   2. (1) + added 1 new metadata partition and is inflight. 
   3. (2) + 1 new partition is completed. 
   
   can you help me understand what this method would return in all these cases. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()

Review comment:
       can we move the catch up indexing to a separate method 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -588,10 +609,87 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
    * @param canTriggerTableService true if table services can be triggered. false otherwise.
    */
   private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
-      commit(engineContext.parallelize(records, 1), MetadataPartitionType.FILES.partitionPath(), instantTime, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        List<HoodieRecord> records = convertMetadataFunction.convertMetadata();
+        commit(engineContext.parallelize(records, 1), p, instantTime, canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to be) written
+    Option<HoodieInstant> lastIndexingInstant = dataMetaClient.getActiveTimeline()
+        .getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to FileSystem.exists() check.");
+        return getExistingMetadataPartitions();

Review comment:
       we can't fallback to fetching all partitions right. some could be inflight and not fully completed wrt index building. or am I missing something

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -620,8 +636,14 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
 
     LOG.info(String.format("Creating %d file groups for partition %s with base fileId %s at instant time %s",
         fileGroupCount, metadataPartition.getPartitionPath(), metadataPartition.getFileIdPrefix(), instantTime));
+    HoodieTableFileSystemView fsView = HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
+    List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, Option.ofNullable(fsView), metadataPartition.getPartitionPath());
     for (int i = 0; i < fileGroupCount; ++i) {
       final String fileGroupFileId = String.format("%s%04d", metadataPartition.getFileIdPrefix(), i);
+      // if a writer or async indexer had already initialized the filegroup then continue
+      if (!fileSlices.isEmpty() && fileSlices.stream().anyMatch(fileSlice -> fileGroupFileId.equals(fileSlice.getFileGroupId().getFileId()))) {
+        continue;

Review comment:
       can you help me understand how does partially failed filegroup instantiation is handled. Do we clean up all file groups and start from scratch or do we continue from where we left ? I mean, if indexer restarts next time around. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. false otherwise.
    */
   private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);

Review comment:
       why are we initializing file groups here? if I am not wrong, this is called in synchronous code path where data table is looking to apply a commit to MDT. with async metadata indexing, wouldn't the scheduling takes responsibility of initializing the file groups. 
   

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. false otherwise.
    */
   private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();

Review comment:
       how does this work for a table that migrated from 0.10.0 for eg. they may not have  added "files" partition to table properties right? i.e. list of fully completed metadata partitions. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/bloom/BloomFilter.java
##########
@@ -30,6 +34,13 @@
    */
   void add(String key);
 
+  /**
+   * Add secondary key to the {@link BloomFilter}.
+   *
+   * @param keys list of secondary keys to add to the {@link BloomFilter}
+   */
+  void add(@Nonnull List<String> keys);

Review comment:
       can you help me understand the purpose of adding the secondary keys. bcoz, I don't see similar method to mightContain for secondary keys. 
   Also, do you think we can name the method conveying secondary keys. 
   addSecondaryKeys() may be.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. false otherwise.
    */
   private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
+        commit(instantTime, partitionRecordsMap, canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to be) written
+    Option<HoodieInstant> lastIndexingInstant = dataMetaClient.getActiveTimeline()
+        .getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to FileSystem.exists() check.");
+        return getExistingMetadataPartitions();
+      }
     }
+    // TODO: return only enabled partitions
+    return MetadataPartitionType.allPaths();
+  }
+
+  private List<String> getExistingMetadataPartitions() {
+    return MetadataPartitionType.allPaths().stream()
+        .filter(p -> {
+          try {
+            // TODO: avoid fs.exists() check
+            return metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), p));
+          } catch (IOException e) {
+            return false;
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void index(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+    indexPartitionInfos.forEach(indexPartitionInfo -> {
+      String indexUptoInstantTime = indexPartitionInfo.getIndexUptoInstant();
+      String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
+      LOG.info(String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s",
+          relativePartitionPath, metadataWriteConfig.getBasePath(), indexUptoInstantTime));
+      try {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.MERGE_ON_READ)
+            .setTableName(tableName)
+            .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+            .setPayloadClassName(HoodieMetadataPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+            .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
+            .setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
+            .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+            .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());

Review comment:
       shouldn't we initTable only for the first time when MDT is getting instantiated for the first time. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -641,12 +663,22 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
     }
   }
 
+  public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws IOException {
+    // TODO: update table config and do it in a transaction

Review comment:
       please file a tracking ticket if we don't have one. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
##########
@@ -19,17 +19,28 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 
 /**
  * Interface that supports updating metadata for a given table, as actions complete.
  */
 public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {
 
+  void index(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos);

Review comment:
       java docs

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Schedules INDEX action.
+ * <li>
+ *   1. Fetch last completed instant on data timeline.
+ *   2. Write the index plan to the <instant>.index.requested.
+ *   3. Initialize filegroups for the enabled partition types within a transaction.
+ * </li>
+ */
+public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexPlan>> {
+
+  private static final Logger LOG = LogManager.getLogger(ScheduleIndexActionExecutor.class);
+  private static final Integer INDEX_PLAN_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_PLAN_VERSION = INDEX_PLAN_VERSION_1;
+
+  private final List<MetadataPartitionType> partitionsToIndex;
+  private final TransactionManager txnManager;
+
+  public ScheduleIndexActionExecutor(HoodieEngineContext context,
+                                     HoodieWriteConfig config,
+                                     HoodieTable<T, I, K, O> table,
+                                     String instantTime,
+                                     List<MetadataPartitionType> partitionsToIndex) {
+    super(context, config, table, instantTime);
+    this.partitionsToIndex = partitionsToIndex;
+    this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
+  }
+
+  @Override
+  public Option<HoodieIndexPlan> execute() {
+    // validate partitionsToIndex
+    if (!MetadataPartitionType.allPaths().containsAll(partitionsToIndex)) {
+      throw new HoodieIndexException("Not all partitions are valid: " + partitionsToIndex);
+    }
+    // get last completed instant
+    Option<HoodieInstant> indexUptoInstant = table.getActiveTimeline().filterCompletedInstants().lastInstant();
+    if (indexUptoInstant.isPresent()) {
+      final HoodieInstant indexInstant = HoodieTimeline.getIndexRequestedInstant(instantTime);
+      // for each partitionToIndex add that time to the plan
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = partitionsToIndex.stream()
+          .map(p -> new HoodieIndexPartitionInfo(LATEST_INDEX_PLAN_VERSION, p.getPartitionPath(), indexUptoInstant.get().getTimestamp()))
+          .collect(Collectors.toList());
+      HoodieIndexPlan indexPlan = new HoodieIndexPlan(LATEST_INDEX_PLAN_VERSION, indexPartitionInfos);
+      try {
+        table.getActiveTimeline().saveToPendingIndexCommit(indexInstant, TimelineMetadataUtils.serializeIndexPlan(indexPlan));
+      } catch (IOException e) {
+        LOG.error("Error while saving index requested file", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      table.getMetaClient().reloadActiveTimeline();
+
+      // start initializing filegroups
+      // 1. get metadata writer
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      // 2. take a lock --> begin tx (data table)
+      try {
+        this.txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
+        // 3. initialize filegroups as per plan for the enabled partition types
+        for (MetadataPartitionType partitionType : partitionsToIndex) {
+          metadataWriter.initializeFileGroups(table.getMetaClient(), partitionType, indexInstant.getTimestamp(), 1);
+        }
+      } catch (IOException e) {
+        LOG.error("Could not initialize file groups");
+        throw new HoodieIOException(e.getMessage(), e);
+      } finally {
+        this.txnManager.endTransaction(Option.of(indexInstant));
+      }
+      return Option.of(indexPlan);
+    }
+    return Option.empty();

Review comment:
       if someone triggers this for an empty table, whats the expected behavior? do we update tableConfig that index building is complete? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -659,20 +691,100 @@ private MetadataRecordsGenerationParams getRecordsGenerationParams() {
 
   /**
    * Processes commit metadata from data table and commits to metadata table.
+   *
    * @param instantTime instant time of interest.
    * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table.
    * @param <T> type of commit metadata.
    * @param canTriggerTableService true if table services can be triggered. false otherwise.
    */
   private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) {
-    if (enabled && metadata != null) {
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
-      commit(instantTime, partitionRecordsMap, canTriggerTableService);
+    List<String> partitionsToUpdate = getMetadataPartitionsToUpdate();
+    partitionsToUpdate.forEach(p -> {
+      if (enabled && metadata != null) {
+        try {
+          initializeFileGroups(dataMetaClient, MetadataPartitionType.valueOf(p.toUpperCase(Locale.ROOT)), instantTime, 1);
+        } catch (IOException e) {
+          throw new HoodieIndexException(String.format("Unable to initialize file groups for metadata partition: %s, instant: %s", p, instantTime));
+        }
+        Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap = convertMetadataFunction.convertMetadata();
+        commit(instantTime, partitionRecordsMap, canTriggerTableService);
+      }
+    });
+  }
+
+  private List<String> getMetadataPartitionsToUpdate() {
+    // find last (pending or) completed index instant and get partitions (to be) written
+    Option<HoodieInstant> lastIndexingInstant = dataMetaClient.getActiveTimeline()
+        .getTimelineOfActions(CollectionUtils.createImmutableSet(HoodieTimeline.INDEX_ACTION)).lastInstant();
+    if (lastIndexingInstant.isPresent()) {
+      try {
+        // TODO: handle inflight instant, if it is inflight then read from requested file.
+        HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(
+            dataMetaClient.getActiveTimeline().readIndexPlanAsBytes(lastIndexingInstant.get()).get());
+        return indexPlan.getIndexPartitionInfos().stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toList());
+      } catch (IOException e) {
+        LOG.warn("Could not read index plan. Falling back to FileSystem.exists() check.");
+        return getExistingMetadataPartitions();
+      }
     }
+    // TODO: return only enabled partitions
+    return MetadataPartitionType.allPaths();
+  }
+
+  private List<String> getExistingMetadataPartitions() {
+    return MetadataPartitionType.allPaths().stream()
+        .filter(p -> {
+          try {
+            // TODO: avoid fs.exists() check
+            return metadataMetaClient.getFs().exists(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(), p));
+          } catch (IOException e) {
+            return false;
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public void index(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos) {
+    indexPartitionInfos.forEach(indexPartitionInfo -> {
+      String indexUptoInstantTime = indexPartitionInfo.getIndexUptoInstant();
+      String relativePartitionPath = indexPartitionInfo.getMetadataPartitionPath();
+      LOG.info(String.format("Creating a new metadata index for partition '%s' under path %s upto instant %s",
+          relativePartitionPath, metadataWriteConfig.getBasePath(), indexUptoInstantTime));
+      try {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.MERGE_ON_READ)
+            .setTableName(tableName)
+            .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
+            .setPayloadClassName(HoodieMetadataPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+            .setRecordKeyFields(RECORD_KEY_FIELD_NAME)
+            .setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
+            .setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
+            .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+        initTableMetadata();
+        // this part now moves to scheduling
+        initializeFileGroups(dataMetaClient, MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT)), indexUptoInstantTime, 1);
+      } catch (IOException e) {
+        throw new HoodieIndexException(String.format("Unable to initialize file groups for metadata partition: %s, indexUptoInstant: %s",
+            relativePartitionPath, indexUptoInstantTime));
+      }
+
+      // List all partitions in the basePath of the containing dataset
+      LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
+      engineContext.setJobStatus(this.getClass().getSimpleName(), "MetadataIndex: initializing metadata table by listing files and partitions");
+      List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+
+      // During bootstrap, the list of files to be committed can be huge. So creating a HoodieCommitMetadata out of these
+      // large number of files and calling the existing update(HoodieCommitMetadata) function does not scale well.
+      // Hence, we have a special commit just for the bootstrap scenario.
+      initialCommit(indexUptoInstantTime);

Review comment:
       is this applicable only for the initialization of first partition in the metadata table? 
   If not, for subsequent partitions, shouldn't the intialCommit take in a list of metadata partitions to be initialized? 
   sorry. guess I am missing something here.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {

Review comment:
       Did we add any additional/explicit metrics for async metadata indexer? time for base file initialization, time for catch up etc. 
   

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+          .filter(instant -> instant.isCompleted() && HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), indexUptoInstant));
+      // reconcile with metadata table timeline
+      String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+      HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();

Review comment:
       this might need some thought. Lets think about all diff scenarios. 
   MDT partition1 was already built out. 
   MDT partition2 is triggered index building. 
   In this case, would compaction kick in just for partition1 in MDT or do we block any compaction in general? 
   Also, how do we guard the archival in MDT timeline in this case. 
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+    int result = indexer.start(cfg.retry);
+    String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
+    if (result == -1) {
+      LOG.error(resultMsg + " failed");
+    } else {
+      LOG.info(resultMsg + " success");
+    }
+    jsc.stop();
+  }
+
+  private int start(int retry) {
+    return UtilHelpers.retry(retry, () -> {
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          Option<String> instantTime = scheduleIndexing(jsc);
+          int result = instantTime.isPresent() ? 0 : -1;
+          if (result == 0) {
+            LOG.info("The schedule instant time is " + instantTime.get());
+          }
+          return result;
+        }
+        case SCHEDULE_AND_EXECUTE: {
+          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          return scheduleAndRunIndexing(jsc);
+        }
+        case EXECUTE: {
+          LOG.info("Running Mode: [" + EXECUTE + "];");
+          return runIndexing(jsc);
+        }
+        default: {
+          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+          return -1;
+        }
+      }
+    }, "Indexer failed");
+  }
+
+  private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      return doSchedule(client);
+    }
+  }
+
+  private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
+    List<String> partitionsToIndex = Arrays.asList(cfg.indexTypes.split(","));
+    List<MetadataPartitionType> partitionTypes = partitionsToIndex.stream()
+        .map(MetadataPartitionType::valueOf).collect(Collectors.toList());
+    Option<String> indexingInstant = client.scheduleIndexing(partitionTypes);
+    if (!indexingInstant.isPresent()) {
+      LOG.error("Scheduling of index action did not return any instant.");
+    }
+    return indexingInstant;
+  }
+
+  private int runIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      if (StringUtils.isNullOrEmpty(cfg.indexInstantTime)) {
+        // Instant time is not specified
+        // Find the earliest scheduled indexing instant for execution
+        Option<HoodieInstant> earliestPendingIndexInstant = metaClient.getActiveTimeline()
+            .filterPendingIndexTimeline()
+            .filter(i -> !(i.isCompleted() || INFLIGHT.equals(i.getState())))
+            .firstInstant();
+        if (earliestPendingIndexInstant.isPresent()) {
+          cfg.indexInstantTime = earliestPendingIndexInstant.get().getTimestamp();
+          LOG.info("Found the earliest scheduled indexing instant which will be executed: "
+              + cfg.indexInstantTime);
+        } else {
+          throw new HoodieIndexException("There is no scheduled indexing in the table.");
+        }
+      }
+      return handleError(client.index(cfg.indexInstantTime));
+    }
+  }
+
+  private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      Option<String> indexingInstantTime = doSchedule(client);
+      if (indexingInstantTime.isPresent()) {
+        return handleError(client.index(indexingInstantTime.get()));

Review comment:
       handleResponse may be better name

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -608,7 +624,7 @@ private void initializeEnabledFileGroups(HoodieTableMetaClient dataMetaClient, S
    * File groups will be named as :
    *    record-index-bucket-0000, .... -> ..., record-index-bucket-0009
    */
-  private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime,
+  public void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime,

Review comment:
       Can we check the bootstrapping code snippet. for eg, we check latest synced instant in metadata table and check if its already archived in data table. 
   With multiple partitions, each partition could be instantiated at different points in time. Can we check all such guards/conditions and ensure its all intact with latest state of metadata table. 
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "

Review comment:
       how is this diff from runSchedule param. its bit confusing. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -889,6 +890,33 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient
     }
   }
 
+  /**
+   * Get the column names for the table for column stats indexing
+   *
+   * @param recordsGenerationParams - all parameters required to generate metadata index for enabled index types
+   * @return List of column names for which column stats index is enabled
+   */
+  private static List<String> getColumnsToIndex(MetadataRecordsGenerationParams recordsGenerationParams) {
+    if (!recordsGenerationParams.isAllColumnStatsIndexEnabled()
+        || recordsGenerationParams.getDataMetaClient().getCommitsTimeline().filterCompletedInstants().countInstants() < 1) {
+      return Arrays.asList(recordsGenerationParams.getDataMetaClient().getTableConfig().getRecordKeyFieldProp().split(","));
+    }
+
+    if (!recordsGenerationParams.getColumnsToIndex().isEmpty()) {
+      return recordsGenerationParams.getColumnsToIndex();
+    }
+
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(recordsGenerationParams.getDataMetaClient());
+    // consider nested fields as well. if column stats is enabled only for a subset of columns,

Review comment:
       guess part of the comment can be removed. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")

Review comment:
       minor. "compacting" -> "indexing"

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+          .filter(instant -> instant.isCompleted() && HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), indexUptoInstant));
+      // reconcile with metadata table timeline
+      String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+      HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+      Set<HoodieInstant> metadataCompletedTimeline = metadataMetaClient.getActiveTimeline()
+          .getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+      List<HoodieInstant> finalRemainingInstantsToIndex = remainingInstantsToIndex.map(
+          instant -> new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp())
+      ).filter(instant -> !metadataCompletedTimeline.contains(instant)).collect(Collectors.toList());
+
+      // index all remaining instants with a timeout
+      ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+      Future<?> postRequestIndexingTaskFuture = executorService.submit(new PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex));
+      try {
+        // TODO: configure timeout
+        postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS);

Review comment:
       60 secs is too short. if there are 100+ instants to catch up, would we complete in 60 secs.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -641,12 +663,22 @@ private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, Metadata
     }
   }
 
+  public void dropIndex(List<MetadataPartitionType> indexesToDrop) throws IOException {
+    // TODO: update table config and do it in a transaction

Review comment:
       If a writer is holding onto an instance of hoodieTableConfig, it may not refresh from time to time right. So, if a partition was deleted mid-way, when the writer tries to apply a commit to metadata table, wont hoodieTableConfig.getMetadataPartitionsToUpdate() return stale values? 
   Do we ensure such flow succeeds even if there are partitions to update, but actual MD partition is deleted? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Schedules INDEX action.
+ * <li>
+ *   1. Fetch last completed instant on data timeline.
+ *   2. Write the index plan to the <instant>.index.requested.
+ *   3. Initialize filegroups for the enabled partition types within a transaction.
+ * </li>
+ */
+public class ScheduleIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexPlan>> {
+
+  private static final Logger LOG = LogManager.getLogger(ScheduleIndexActionExecutor.class);
+  private static final Integer INDEX_PLAN_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_PLAN_VERSION = INDEX_PLAN_VERSION_1;
+
+  private final List<MetadataPartitionType> partitionsToIndex;
+  private final TransactionManager txnManager;
+
+  public ScheduleIndexActionExecutor(HoodieEngineContext context,
+                                     HoodieWriteConfig config,
+                                     HoodieTable<T, I, K, O> table,
+                                     String instantTime,
+                                     List<MetadataPartitionType> partitionsToIndex) {
+    super(context, config, table, instantTime);
+    this.partitionsToIndex = partitionsToIndex;
+    this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
+  }
+
+  @Override
+  public Option<HoodieIndexPlan> execute() {
+    // validate partitionsToIndex
+    if (!MetadataPartitionType.allPaths().containsAll(partitionsToIndex)) {
+      throw new HoodieIndexException("Not all partitions are valid: " + partitionsToIndex);
+    }
+    // get last completed instant
+    Option<HoodieInstant> indexUptoInstant = table.getActiveTimeline().filterCompletedInstants().lastInstant();
+    if (indexUptoInstant.isPresent()) {
+      final HoodieInstant indexInstant = HoodieTimeline.getIndexRequestedInstant(instantTime);
+      // for each partitionToIndex add that time to the plan
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = partitionsToIndex.stream()
+          .map(p -> new HoodieIndexPartitionInfo(LATEST_INDEX_PLAN_VERSION, p.getPartitionPath(), indexUptoInstant.get().getTimestamp()))
+          .collect(Collectors.toList());
+      HoodieIndexPlan indexPlan = new HoodieIndexPlan(LATEST_INDEX_PLAN_VERSION, indexPartitionInfos);
+      try {
+        table.getActiveTimeline().saveToPendingIndexCommit(indexInstant, TimelineMetadataUtils.serializeIndexPlan(indexPlan));
+      } catch (IOException e) {
+        LOG.error("Error while saving index requested file", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      table.getMetaClient().reloadActiveTimeline();
+
+      // start initializing filegroups
+      // 1. get metadata writer
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      // 2. take a lock --> begin tx (data table)
+      try {
+        this.txnManager.beginTransaction(Option.of(indexInstant), Option.empty());
+        // 3. initialize filegroups as per plan for the enabled partition types
+        for (MetadataPartitionType partitionType : partitionsToIndex) {
+          metadataWriter.initializeFileGroups(table.getMetaClient(), partitionType, indexInstant.getTimestamp(), 1);

Review comment:
       guess last arg is partitionType.getFileGroupCount()
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")

Review comment:
       +1

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()

Review comment:
       there could be some instants in data table timeline that got archived. did we consider those scenarios.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+          .filter(instant -> instant.isCompleted() && HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), indexUptoInstant));
+      // reconcile with metadata table timeline
+      String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+      HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+      Set<HoodieInstant> metadataCompletedTimeline = metadataMetaClient.getActiveTimeline()
+          .getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+      List<HoodieInstant> finalRemainingInstantsToIndex = remainingInstantsToIndex.map(

Review comment:
       I see we fetch all instants (pending, complete) at L106. so, I assume finalRemainingInstantsToIndex could contain inflight commits as well. And so, there are chances that when executing PostRequestIndexingTask, the actual writer would have already applied the commit to MDT. have we considered this scenario. 
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")

Review comment:
       is there a necessity to add cancelIndexing operation ?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -121,6 +121,15 @@ protected void initRegistry() {
     }
   }
 
+  @Override
+  protected void scheduleIndex(List<String> partitions) {
+    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet.");

Review comment:
       can you confirm this. for "files", we always do synchronous initialization is it? 
   what happens, if during synchronous initialization of metadata table, someone schedules "col_stats" partition indexing via the tool. Do we guard the writes/critical section w/ a lock? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+          .filter(instant -> instant.isCompleted() && HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), indexUptoInstant));
+      // reconcile with metadata table timeline
+      String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+      HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+      Set<HoodieInstant> metadataCompletedTimeline = metadataMetaClient.getActiveTimeline()
+          .getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+      List<HoodieInstant> finalRemainingInstantsToIndex = remainingInstantsToIndex.map(
+          instant -> new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp())
+      ).filter(instant -> !metadataCompletedTimeline.contains(instant)).collect(Collectors.toList());
+
+      // index all remaining instants with a timeout
+      ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+      Future<?> postRequestIndexingTaskFuture = executorService.submit(new PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex));
+      try {
+        // TODO: configure timeout
+        postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS);
+      } catch (TimeoutException | InterruptedException | ExecutionException e) {
+        postRequestIndexingTaskFuture.cancel(true);
+      } finally {
+        executorService.shutdownNow();
+      }
+      Option<HoodieInstant> lastMetadataInstant = metadataMetaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+      if (lastMetadataInstant.isPresent() && indexUptoInstant.equals(lastMetadataInstant.get().getTimestamp())) {
+        return Option.of(HoodieIndexCommitMetadata.newBuilder()
+            .setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(indexPartitionInfos).build());
+      }
+      List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = indexPartitionInfos.stream()
+          .map(info -> new HoodieIndexPartitionInfo(
+              info.getVersion(),
+              info.getMetadataPartitionPath(),
+              lastMetadataInstant.get().getTimestamp())).collect(Collectors.toList());
+      return Option.of(HoodieIndexCommitMetadata.newBuilder()
+          .setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build());
+    } catch (IOException e) {
+      throw new HoodieIndexException(String.format("Unable to index instant: %s", indexInstant));
+    }
+  }
+
+  class PostRequestIndexingTask implements Runnable {
+
+    private final HoodieTableMetadataWriter metadataWriter;
+    private final List<HoodieInstant> instantsToIndex;
+
+    PostRequestIndexingTask(HoodieTableMetadataWriter metadataWriter, List<HoodieInstant> instantsToIndex) {
+      this.metadataWriter = metadataWriter;
+      this.instantsToIndex = instantsToIndex;
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.interrupted()) {
+        for (HoodieInstant instant : instantsToIndex) {

Review comment:
       don't we need to take a lock here? 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+          .filter(instant -> instant.isCompleted() && HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), indexUptoInstant));
+      // reconcile with metadata table timeline
+      String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+      HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+      Set<HoodieInstant> metadataCompletedTimeline = metadataMetaClient.getActiveTimeline()
+          .getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+      List<HoodieInstant> finalRemainingInstantsToIndex = remainingInstantsToIndex.map(
+          instant -> new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp())
+      ).filter(instant -> !metadataCompletedTimeline.contains(instant)).collect(Collectors.toList());
+
+      // index all remaining instants with a timeout
+      ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+      Future<?> postRequestIndexingTaskFuture = executorService.submit(new PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex));
+      try {
+        // TODO: configure timeout
+        postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS);
+      } catch (TimeoutException | InterruptedException | ExecutionException e) {
+        postRequestIndexingTaskFuture.cancel(true);
+      } finally {
+        executorService.shutdownNow();
+      }
+      Option<HoodieInstant> lastMetadataInstant = metadataMetaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
+      if (lastMetadataInstant.isPresent() && indexUptoInstant.equals(lastMetadataInstant.get().getTimestamp())) {
+        return Option.of(HoodieIndexCommitMetadata.newBuilder()
+            .setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(indexPartitionInfos).build());
+      }
+      List<HoodieIndexPartitionInfo> finalIndexPartitionInfos = indexPartitionInfos.stream()
+          .map(info -> new HoodieIndexPartitionInfo(
+              info.getVersion(),
+              info.getMetadataPartitionPath(),
+              lastMetadataInstant.get().getTimestamp())).collect(Collectors.toList());
+      return Option.of(HoodieIndexCommitMetadata.newBuilder()
+          .setVersion(LATEST_INDEX_COMMIT_METADATA_VERSION).setIndexPartitionInfos(finalIndexPartitionInfos).build());
+    } catch (IOException e) {

Review comment:
       sorry, where are we checking the holes and aborting the index building ? 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -343,6 +347,16 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
         deleteInstants, skipLocking).execute();
   }
 
+  @Override
+  public Option<HoodieIndexPlan> scheduleIndex(HoodieEngineContext context, String indexInstantTime, List<String> partitionsToIndex) {
+    return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();

Review comment:
       this is just 1 line. don't think its a must to move to base class. will leave it to you though. Already we have similar code across all engines. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Reads the index plan and executes the plan.
+ * It also reconciles updates on data timeline while indexing was in progress.
+ */
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);

Review comment:
       don't we need locking here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org