You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/11 09:54:40 UTC

[GitHub] [hudi] prashantwason commented on a change in pull request #4693: [WIP][HUDI-3175][RFC-45] Implement async metadata indexing

prashantwason commented on a change in pull request #4693:
URL: https://github.com/apache/hudi/pull/4693#discussion_r824490905



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -855,6 +856,21 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
     return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
   }
 
+  public Option<String> scheduleIndexing(List<String> partitions) {
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    return scheduleIndexingAtInstant(partitions, instantTime) ? Option.of(instantTime) : Option.empty();
+  }
+
+  private boolean scheduleIndexingAtInstant(List<String> partitionsToIndex, String instantTime) throws HoodieIOException {

Review comment:
       This being a private function only called from the above function, why not merge it to scheduleIndexing?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.
+      HoodieIndexPlan indexPlan = TimelineMetadataUtils.deserializeIndexPlan(table.getActiveTimeline().readIndexPlanAsBytes(indexInstant).get());
+      List<HoodieIndexPartitionInfo> indexPartitionInfos = indexPlan.getIndexPartitionInfos();
+      if (indexPartitionInfos == null || indexPartitionInfos.isEmpty()) {
+        throw new HoodieIndexException(String.format("No partitions to index for instant: %s", instantTime));
+      }
+      // transition requested indexInstant to inflight
+      table.getActiveTimeline().transitionIndexRequestedToInflight(indexInstant, Option.empty());
+      // start indexing for each partition
+      HoodieTableMetadataWriter metadataWriter = table.getMetadataWriter(instantTime)
+          .orElseThrow(() -> new HoodieIndexException(String.format("Could not get metadata writer to run index action for instant: %s", instantTime)));
+      metadataWriter.index(context, indexPartitionInfos);
+      // get all completed instants since the plan completed
+      // assumption is that all metadata partitions had same instant upto which they were scheduled to be indexed
+      String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant();
+      Stream<HoodieInstant> remainingInstantsToIndex = table.getActiveTimeline().getWriteTimeline().getReverseOrderedInstants()
+          .filter(instant -> instant.isCompleted() && HoodieActiveTimeline.GREATER_THAN.test(instant.getTimestamp(), indexUptoInstant));
+      // reconcile with metadata table timeline
+      String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(table.getMetaClient().getBasePath());
+      HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataBasePath).build();
+      Set<HoodieInstant> metadataCompletedTimeline = metadataMetaClient.getActiveTimeline()
+          .getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+      List<HoodieInstant> finalRemainingInstantsToIndex = remainingInstantsToIndex.map(
+          instant -> new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp())
+      ).filter(instant -> !metadataCompletedTimeline.contains(instant)).collect(Collectors.toList());
+
+      // index all remaining instants with a timeout
+      ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
+      Future<?> postRequestIndexingTaskFuture = executorService.submit(new PostRequestIndexingTask(metadataWriter, finalRemainingInstantsToIndex));
+      try {
+        // TODO: configure timeout
+        postRequestIndexingTaskFuture.get(60, TimeUnit.SECONDS);
+      } catch (TimeoutException | InterruptedException | ExecutionException e) {
+        postRequestIndexingTaskFuture.cancel(true);
+      } finally {
+        executorService.shutdownNow();
+      }
+      Option<HoodieInstant> lastMetadataInstant = metadataMetaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();

Review comment:
       Can you add some comments describing the logic here? 

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -337,6 +339,16 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
     return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
   }
 
+  @Override
+  public Option<HoodieIndexPlan> scheduleIndex(HoodieEngineContext context, String indexInstantTime, List<String> partitionsToIndex) {
+    throw new HoodieNotSupportedException("Indexing is not supported for a Flink table yet.");

Review comment:
       Metadata indexing

##########
File path: hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieIndexPartitionInfo",
+  "fields": [

Review comment:
       please add "doc" for each field

##########
File path: hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieIndexPartitionInfo",
+  "fields": [
+    {
+      "name": "version",

Review comment:
       what is the use of version here?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+    int result = indexer.start(cfg.retry);
+    String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
+    if (result == -1) {
+      LOG.error(resultMsg + " failed");
+    } else {
+      LOG.info(resultMsg + " success");
+    }
+    jsc.stop();
+  }
+
+  private int start(int retry) {
+    return UtilHelpers.retry(retry, () -> {
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          Option<String> instantTime = scheduleIndexing(jsc);
+          int result = instantTime.isPresent() ? 0 : -1;
+          if (result == 0) {
+            LOG.info("The schedule instant time is " + instantTime.get());
+          }
+          return result;
+        }
+        case SCHEDULE_AND_EXECUTE: {
+          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");

Review comment:
       This could be simply scheduleIndexing(..); followed by runIndexing(..)
      

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -855,6 +855,17 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
     return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
   }
 
+  public Option<String> scheduleIndexing(List<String> partitions) {

Review comment:
       +1
   
   

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -121,6 +121,15 @@ protected void initRegistry() {
     }
   }
 
+  @Override
+  protected void scheduleIndex(List<String> partitions) {
+    ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet.");

Review comment:
       This means that the "files" partition cannot be indexed through async indexing? (RFC says so).
   
   I feel the files partition is integral to any other partition and is always present if MT is enabled. Also, the time to create files partition is very low. So I prefer if we keep it simple and always index files partition inline.
   
   

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),

Review comment:
       Better check may be to ensure instant state is requested 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.index;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class RunIndexActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieIndexCommitMetadata>> {
+
+  private static final Logger LOG = LogManager.getLogger(RunIndexActionExecutor.class);
+  private static final Integer INDEX_COMMIT_METADATA_VERSION_1 = 1;
+  private static final Integer LATEST_INDEX_COMMIT_METADATA_VERSION = INDEX_COMMIT_METADATA_VERSION_1;
+  private static final int MAX_CONCURRENT_INDEXING = 1;
+
+  public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> execute() {
+    HoodieTimer indexTimer = new HoodieTimer();
+    indexTimer.startTimer();
+
+    HoodieInstant indexInstant = table.getActiveTimeline()
+        .filterPendingIndexTimeline()
+        .filter(instant -> instant.getTimestamp().equals(instantTime))
+        .lastInstant()
+        .orElseThrow(() -> new HoodieIndexException(String.format("No pending index instant found: %s", instantTime)));
+    ValidationUtils.checkArgument(HoodieInstant.State.INFLIGHT.equals(indexInstant.getState()),
+        String.format("Index instant %s already inflight", instantTime));
+    try {
+      // read HoodieIndexPlan assuming indexInstant is requested
+      // TODO: handle inflight instant, if it is inflight then throw error.

Review comment:
       this is being already done in line 83 above

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -337,6 +339,16 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
     return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
   }
 
+  @Override
+  public Option<HoodieIndexPlan> scheduleIndex(HoodieEngineContext context, String indexInstantTime, List<String> partitionsToIndex) {
+    throw new HoodieNotSupportedException("Indexing is not supported for a Flink table yet.");
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> index(HoodieEngineContext context, String indexInstantTime) {
+    throw new HoodieNotSupportedException("Indexing is not supported for a Flink table yet.");

Review comment:
       Metadata indexing

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -343,6 +347,16 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
         deleteInstants, skipLocking).execute();
   }
 
+  @Override
+  public Option<HoodieIndexPlan> scheduleIndex(HoodieEngineContext context, String indexInstantTime, List<String> partitionsToIndex) {
+    return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();
+  }
+
+  @Override
+  public Option<HoodieIndexCommitMetadata> index(HoodieEngineContext context, String indexInstantTime) {
+    return new RunIndexActionExecutor<>(context, config, this, indexInstantTime).execute();

Review comment:
       Duplicate code with Java Table. Is there a base class that we can move this to?

##########
File path: hudi-common/src/main/avro/HoodieIndexCommitMetadata.avsc
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieIndexCommitMetadata",
+  "fields": [

Review comment:
       please add "doc" for each field

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+    int result = indexer.start(cfg.retry);
+    String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
+    if (result == -1) {
+      LOG.error(resultMsg + " failed");
+    } else {
+      LOG.info(resultMsg + " success");
+    }
+    jsc.stop();
+  }
+
+  private int start(int retry) {
+    return UtilHelpers.retry(retry, () -> {
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");

Review comment:
       if you move all this code into scheduleIndexing(), you can simplify SCHEDULE_AND_EXECUTE and remove some code duplication.
   
   

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
##########
@@ -343,6 +347,16 @@ public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollb
         deleteInstants, skipLocking).execute();
   }
 
+  @Override
+  public Option<HoodieIndexPlan> scheduleIndex(HoodieEngineContext context, String indexInstantTime, List<String> partitionsToIndex) {
+    return new ScheduleIndexActionExecutor<>(context, config, this, indexInstantTime, partitionsToIndex).execute();

Review comment:
       Duplicate code with Java Table. Is there a base class that we can move this to?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")

Review comment:
       probably choose a more appropriate name like "partitions" or partitions-to-index
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+    int result = indexer.start(cfg.retry);
+    String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
+    if (result == -1) {
+      LOG.error(resultMsg + " failed");
+    } else {
+      LOG.info(resultMsg + " success");
+    }
+    jsc.stop();
+  }
+
+  private int start(int retry) {
+    return UtilHelpers.retry(retry, () -> {
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          Option<String> instantTime = scheduleIndexing(jsc);
+          int result = instantTime.isPresent() ? 0 : -1;
+          if (result == 0) {
+            LOG.info("The schedule instant time is " + instantTime.get());
+          }
+          return result;
+        }
+        case SCHEDULE_AND_EXECUTE: {
+          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          return scheduleAndRunIndexing(jsc);
+        }
+        case EXECUTE: {
+          LOG.info("Running Mode: [" + EXECUTE + "];");
+          return runIndexing(jsc);
+        }
+        default: {
+          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+          return -1;
+        }
+      }
+    }, "Indexer failed");
+  }
+
+  private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);

Review comment:
       Maybe allow only one indexing operation (or one indexing operation per partition) to be scheduled at a time.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)

Review comment:
       why is this required to be passed here? I assume the spark-submit is a better place to specify spark params.

##########
File path: hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "namespace": "org.apache.hudi.avro.model",
+  "type": "record",
+  "name": "HoodieIndexPartitionInfo",
+  "fields": [
+    {
+      "name": "version",
+      "type": [
+        "int",
+        "null"
+      ],
+      "default": 1
+    },
+    {
+      "name": "metadataPartitionPath",
+      "type": [
+        "null",
+        "string"
+      ],
+      "default": null
+    },
+    {
+      "name": "indexUptoInstant",

Review comment:
       Since this is assumed common across all partitions being indexed, does it make sense to move this to HoodieIndexCommitMetadata?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+    int result = indexer.start(cfg.retry);
+    String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
+    if (result == -1) {
+      LOG.error(resultMsg + " failed");
+    } else {
+      LOG.info(resultMsg + " success");
+    }
+    jsc.stop();
+  }
+
+  private int start(int retry) {
+    return UtilHelpers.retry(retry, () -> {
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          Option<String> instantTime = scheduleIndexing(jsc);
+          int result = instantTime.isPresent() ? 0 : -1;
+          if (result == 0) {
+            LOG.info("The schedule instant time is " + instantTime.get());
+          }
+          return result;
+        }
+        case SCHEDULE_AND_EXECUTE: {
+          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          return scheduleAndRunIndexing(jsc);
+        }
+        case EXECUTE: {
+          LOG.info("Running Mode: [" + EXECUTE + "];");
+          return runIndexing(jsc);
+        }
+        default: {
+          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+          return -1;
+        }
+      }
+    }, "Indexer failed");
+  }
+
+  private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      return doSchedule(client);
+    }
+  }
+
+  private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
+    List<String> partitionsToIndex = Arrays.asList(cfg.indexTypes.split(","));
+    Option<String> indexingInstant = client.scheduleIndexing(partitionsToIndex);
+    if (!indexingInstant.isPresent()) {
+      LOG.error("Scheduling of index action did not return any instant.");
+    }
+    return indexingInstant;
+  }
+
+  private int runIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      if (StringUtils.isNullOrEmpty(cfg.indexInstantTime)) {
+        // Instant time is not specified
+        // Find the earliest scheduled indexing instant for execution
+        Option<HoodieInstant> earliestPendingIndexInstant = metaClient.getActiveTimeline()
+            .filterPendingIndexTimeline()
+            .filter(i -> !(i.isCompleted() || INFLIGHT.equals(i.getState())))
+            .firstInstant();
+        if (earliestPendingIndexInstant.isPresent()) {
+          cfg.indexInstantTime = earliestPendingIndexInstant.get().getTimestamp();
+          LOG.info("Found the earliest scheduled indexing instant which will be executed: "
+              + cfg.indexInstantTime);
+        } else {
+          throw new HoodieIndexException("There is no scheduled indexing in the table.");
+        }
+      }
+      return handleError(client.index(cfg.indexInstantTime));
+    }
+  }
+
+  private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);

Review comment:
       code duplication here. I suggested above on how to remove this function.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+    int result = indexer.start(cfg.retry);
+    String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
+    if (result == -1) {
+      LOG.error(resultMsg + " failed");
+    } else {
+      LOG.info(resultMsg + " success");
+    }
+    jsc.stop();
+  }
+
+  private int start(int retry) {
+    return UtilHelpers.retry(retry, () -> {
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          Option<String> instantTime = scheduleIndexing(jsc);
+          int result = instantTime.isPresent() ? 0 : -1;
+          if (result == 0) {
+            LOG.info("The schedule instant time is " + instantTime.get());
+          }
+          return result;
+        }
+        case SCHEDULE_AND_EXECUTE: {
+          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          return scheduleAndRunIndexing(jsc);
+        }
+        case EXECUTE: {
+          LOG.info("Running Mode: [" + EXECUTE + "];");
+          return runIndexing(jsc);
+        }
+        default: {
+          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+          return -1;
+        }
+      }
+    }, "Indexer failed");
+  }
+
+  private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      return doSchedule(client);
+    }
+  }
+
+  private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
+    List<String> partitionsToIndex = Arrays.asList(cfg.indexTypes.split(","));
+    Option<String> indexingInstant = client.scheduleIndexing(partitionsToIndex);
+    if (!indexingInstant.isPresent()) {
+      LOG.error("Scheduling of index action did not return any instant.");
+    }
+    return indexingInstant;
+  }
+
+  private int runIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      if (StringUtils.isNullOrEmpty(cfg.indexInstantTime)) {
+        // Instant time is not specified
+        // Find the earliest scheduled indexing instant for execution
+        Option<HoodieInstant> earliestPendingIndexInstant = metaClient.getActiveTimeline()
+            .filterPendingIndexTimeline()
+            .filter(i -> !(i.isCompleted() || INFLIGHT.equals(i.getState())))
+            .firstInstant();
+        if (earliestPendingIndexInstant.isPresent()) {
+          cfg.indexInstantTime = earliestPendingIndexInstant.get().getTimestamp();
+          LOG.info("Found the earliest scheduled indexing instant which will be executed: "
+              + cfg.indexInstantTime);
+        } else {
+          throw new HoodieIndexException("There is no scheduled indexing in the table.");
+        }
+      }
+      return handleError(client.index(cfg.indexInstantTime));
+    }
+  }
+
+  private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      Option<String> indexingInstantTime = doSchedule(client);
+      if (indexingInstantTime.isPresent()) {
+        return handleError(client.index(indexingInstantTime.get()));
+      } else {
+        return -1;
+      }
+    }
+  }
+
+  private int handleError(Option<HoodieIndexCommitMetadata> commitMetadata) {

Review comment:
       boolean seems a better return value from this func.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieIndexer.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
+import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIndexException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
+import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
+import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+
+public class HoodieIndexer {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieIndexer.class);
+
+  private final HoodieIndexer.Config cfg;
+  private TypedProperties props;
+  private final JavaSparkContext jsc;
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieIndexer(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    this.cfg = cfg;
+    this.jsc = jsc;
+    this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath)
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = UtilHelpers.createMetaClient(jsc, cfg.basePath, true);
+  }
+
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, HoodieIndexer.Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--instant-time", "-it"}, description = "Indexing Instant time")
+    public String indexInstantTime = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert", required = true)
+    public int parallelism = 1;
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master")
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true)
+    public String sparkMemory = null;
+    @Parameter(names = {"--retry", "-rt"}, description = "number of retries")
+    public int retry = 0;
+    @Parameter(names = {"--schedule", "-sc"}, description = "Schedule indexing")
+    public Boolean runSchedule = false;
+    @Parameter(names = {"--strategy", "-st"}, description = "Comma-separated index types to be built, e.g. BLOOM,FILES,COLSTATS")
+    public String indexTypes = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: Set \"schedule\" to generate an indexing plan; "
+        + "Set \"execute\" to execute the indexing plan at the given instant, which means --instant-time is required here; "
+        + "Set \"scheduleAndExecute\" to generate an indexing plan first and execute that plan immediately")
+    public String runningMode = null;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+  }
+
+  public static void main(String[] args) {
+    final HoodieIndexer.Config cfg = new HoodieIndexer.Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+
+    final JavaSparkContext jsc = UtilHelpers.buildSparkContext("indexing-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory);
+    HoodieIndexer indexer = new HoodieIndexer(jsc, cfg);
+    int result = indexer.start(cfg.retry);
+    String resultMsg = String.format("Indexing with basePath: %s, tableName: %s, runningMode: %s",
+        cfg.basePath, cfg.tableName, cfg.runningMode);
+    if (result == -1) {
+      LOG.error(resultMsg + " failed");
+    } else {
+      LOG.info(resultMsg + " success");
+    }
+    jsc.stop();
+  }
+
+  private int start(int retry) {
+    return UtilHelpers.retry(retry, () -> {
+      switch (cfg.runningMode.toLowerCase()) {
+        case SCHEDULE: {
+          LOG.info("Running Mode: [" + SCHEDULE + "]; Do schedule");
+          Option<String> instantTime = scheduleIndexing(jsc);
+          int result = instantTime.isPresent() ? 0 : -1;
+          if (result == 0) {
+            LOG.info("The schedule instant time is " + instantTime.get());
+          }
+          return result;
+        }
+        case SCHEDULE_AND_EXECUTE: {
+          LOG.info("Running Mode: [" + SCHEDULE_AND_EXECUTE + "]");
+          return scheduleAndRunIndexing(jsc);
+        }
+        case EXECUTE: {
+          LOG.info("Running Mode: [" + EXECUTE + "];");
+          return runIndexing(jsc);
+        }
+        default: {
+          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+          return -1;
+        }
+      }
+    }, "Indexer failed");
+  }
+
+  private Option<String> scheduleIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      return doSchedule(client);
+    }
+  }
+
+  private Option<String> doSchedule(SparkRDDWriteClient<HoodieRecordPayload> client) {
+    List<String> partitionsToIndex = Arrays.asList(cfg.indexTypes.split(","));
+    Option<String> indexingInstant = client.scheduleIndexing(partitionsToIndex);
+    if (!indexingInstant.isPresent()) {
+      LOG.error("Scheduling of index action did not return any instant.");
+    }
+    return indexingInstant;
+  }
+
+  private int runIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      if (StringUtils.isNullOrEmpty(cfg.indexInstantTime)) {
+        // Instant time is not specified
+        // Find the earliest scheduled indexing instant for execution
+        Option<HoodieInstant> earliestPendingIndexInstant = metaClient.getActiveTimeline()
+            .filterPendingIndexTimeline()
+            .filter(i -> !(i.isCompleted() || INFLIGHT.equals(i.getState())))
+            .firstInstant();
+        if (earliestPendingIndexInstant.isPresent()) {
+          cfg.indexInstantTime = earliestPendingIndexInstant.get().getTimestamp();
+          LOG.info("Found the earliest scheduled indexing instant which will be executed: "
+              + cfg.indexInstantTime);
+        } else {
+          throw new HoodieIndexException("There is no scheduled indexing in the table.");
+        }
+      }
+      return handleError(client.index(cfg.indexInstantTime));
+    }
+  }
+
+  private int scheduleAndRunIndexing(JavaSparkContext jsc) throws Exception {
+    String schemaStr = UtilHelpers.getSchemaFromLatestInstant(metaClient);
+    try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      Option<String> indexingInstantTime = doSchedule(client);
+      if (indexingInstantTime.isPresent()) {
+        return handleError(client.index(indexingInstantTime.get()));
+      } else {
+        return -1;
+      }
+    }
+  }
+
+  private int handleError(Option<HoodieIndexCommitMetadata> commitMetadata) {
+    if (!commitMetadata.isPresent()) {
+      LOG.error("Indexing failed as no commit metadata present.");
+      return -1;
+    }

Review comment:
       Another error check should  be that indexing completed for all required partitions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org