You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2022/10/12 12:04:45 UTC

[hudi] branch master updated: Implement Create/Drop/Show/Refresh Secondary Index (#5933)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b789d14535 Implement Create/Drop/Show/Refresh Secondary Index (#5933)
b789d14535 is described below

commit b789d14535c143e14640b18af8fd90b24d7b5b64
Author: huberylee <sh...@foxmail.com>
AuthorDate: Wed Oct 12 20:04:35 2022 +0800

    Implement Create/Drop/Show/Refresh Secondary Index (#5933)
---
 .../hudi/common/table/HoodieTableConfig.java       |  13 ++
 .../hudi/common/table/HoodieTableMetaClient.java   |  14 +-
 .../exception/HoodieSecondaryIndexException.java   |  30 +++
 .../index/HoodieSecondaryIndex.java}               |  82 ++++----
 .../secondary/index/SecondaryIndexManager.java     | 221 +++++++++++++++++++++
 .../index/SecondaryIndexType.java}                 |  14 +-
 .../hudi/secondary/index/SecondaryIndexUtils.java  |  87 ++++++++
 .../spark/sql/catalyst/plans/logical/Index.scala   |   2 +-
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   4 +-
 .../spark/sql/hudi/command/IndexCommands.scala     |  97 ++++++---
 .../sql/hudi/command/index/TestIndexSyntax.scala   |   4 +-
 .../hudi/command/index/TestSecondaryIndex.scala    |  92 +++++++++
 12 files changed, 585 insertions(+), 75 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f295b0019c..ac3608fc00 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -252,6 +252,11 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Comma-separated list of metadata partitions that have been completely built and in-sync with data table. "
           + "These partitions are ready for use by the readers");
 
+  public static final ConfigProperty<String> SECONDARY_INDEXES_METADATA = ConfigProperty
+      .key("hoodie.table.secondary.indexes.metadata")
+      .noDefaultValue()
+      .withDocumentation("The metadata of secondary indexes");
+
   private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
 
   public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
@@ -516,6 +521,14 @@ public class HoodieTableConfig extends HoodieConfig {
     return Option.empty();
   }
 
+  public Option<String> getSecondaryIndexesMetadata() {
+    if (contains(SECONDARY_INDEXES_METADATA)) {
+      return Option.of(getString(SECONDARY_INDEXES_METADATA));
+    }
+
+    return Option.empty();
+  }
+
   /**
    * @returns the partition field prop.
    * @deprecated please use {@link #getPartitionFields()} instead
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 610a0f4185..87f2410af4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -744,6 +744,7 @@ public class HoodieTableMetaClient implements Serializable {
     private Boolean shouldDropPartitionColumns;
     private String metadataPartitions;
     private String inflightMetadataPartitions;
+    private String secondaryIndexesMetadata;
 
     /**
      * Persist the configs that is written at the first time, and should not be changed.
@@ -888,6 +889,11 @@ public class HoodieTableMetaClient implements Serializable {
       return this;
     }
 
+    public PropertyBuilder setSecondaryIndexesMetadata(String secondaryIndexesMetadata) {
+      this.secondaryIndexesMetadata = secondaryIndexesMetadata;
+      return this;
+    }
+
     private void set(String key, Object value) {
       if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
         this.others.put(key, value);
@@ -895,7 +901,7 @@ public class HoodieTableMetaClient implements Serializable {
     }
 
     public PropertyBuilder set(Map<String, Object> props) {
-      for (String key: HoodieTableConfig.PERSISTED_CONFIG_LIST) {
+      for (String key : HoodieTableConfig.PERSISTED_CONFIG_LIST) {
         Object value = props.get(key);
         if (value != null) {
           set(key, value);
@@ -1000,6 +1006,9 @@ public class HoodieTableMetaClient implements Serializable {
       if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
         setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
       }
+      if (hoodieConfig.contains(HoodieTableConfig.SECONDARY_INDEXES_METADATA)) {
+        setSecondaryIndexesMetadata(hoodieConfig.getString(HoodieTableConfig.SECONDARY_INDEXES_METADATA));
+      }
       return this;
     }
 
@@ -1096,6 +1105,9 @@ public class HoodieTableMetaClient implements Serializable {
       if (null != inflightMetadataPartitions) {
         tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
       }
+      if (null != secondaryIndexesMetadata) {
+        tableConfig.setValue(HoodieTableConfig.SECONDARY_INDEXES_METADATA, secondaryIndexesMetadata);
+      }
       return tableConfig.getProps();
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java
new file mode 100644
index 0000000000..361416c909
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieSecondaryIndexException extends HoodieException {
+  public HoodieSecondaryIndexException(String message) {
+    super(message);
+  }
+
+  public HoodieSecondaryIndexException(String message, Throwable t) {
+    super(message, t);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/HoodieSecondaryIndex.java
similarity index 54%
rename from hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
rename to hudi-common/src/main/java/org/apache/hudi/secondary/index/HoodieSecondaryIndex.java
index 6dabb1a41f..8b50d9268e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/HoodieSecondaryIndex.java
@@ -17,48 +17,48 @@
  * under the License.
  */
 
-package org.apache.hudi.common.index;
+package org.apache.hudi.secondary.index;
 
-import java.util.Arrays;
+import org.apache.hudi.exception.HoodieSecondaryIndexException;
+
+import java.util.Comparator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
-public class HoodieIndex {
+public class HoodieSecondaryIndex {
   private String indexName;
-  private String[] colNames;
-  private HoodieIndexType indexType;
-  private Map<String, Map<String, String>> colOptions;
+  private SecondaryIndexType indexType;
+
+  // The index fields need to be in order
+  private LinkedHashMap<String, Map<String, String>> columns;
   private Map<String, String> options;
 
-  public HoodieIndex() {
+  public HoodieSecondaryIndex() {
   }
 
-  public HoodieIndex(
+  public HoodieSecondaryIndex(
       String indexName,
-      String[] colNames,
-      HoodieIndexType indexType,
-      Map<String, Map<String, String>> colOptions,
+      SecondaryIndexType indexType,
+      LinkedHashMap<String, Map<String, String>> columns,
       Map<String, String> options) {
     this.indexName = indexName;
-    this.colNames = colNames;
     this.indexType = indexType;
-    this.colOptions = colOptions;
+    this.columns = columns;
     this.options = options;
+
+    validate();
   }
 
   public String getIndexName() {
     return indexName;
   }
 
-  public String[] getColNames() {
-    return colNames;
-  }
-
-  public HoodieIndexType getIndexType() {
+  public SecondaryIndexType getIndexType() {
     return indexType;
   }
 
-  public Map<String, Map<String, String>> getColOptions() {
-    return colOptions;
+  public Map<String, Map<String, String>> getColumns() {
+    return columns;
   }
 
   public Map<String, String> getOptions() {
@@ -69,22 +69,32 @@ public class HoodieIndex {
     return new Builder();
   }
 
+  private void validate() {
+    switch (indexType) {
+      case LUCENE:
+        if (columns.size() != 1) {
+          throw new HoodieSecondaryIndexException("Lucene index only support single column");
+        }
+        break;
+      default:
+        return;
+    }
+  }
+
   @Override
   public String toString() {
     return "HoodieIndex{"
         + "indexName='" + indexName + '\''
-        + ", colNames='" + Arrays.toString(colNames) + '\''
         + ", indexType=" + indexType
-        + ", colOptions=" + colOptions
+        + ", columns=" + columns
         + ", options=" + options
         + '}';
   }
 
   public static class Builder {
     private String indexName;
-    private String[] colNames;
-    private HoodieIndexType indexType;
-    private Map<String, Map<String, String>> colOptions;
+    private SecondaryIndexType indexType;
+    private LinkedHashMap<String, Map<String, String>> columns;
     private Map<String, String> options;
 
     public Builder setIndexName(String indexName) {
@@ -92,18 +102,13 @@ public class HoodieIndex {
       return this;
     }
 
-    public Builder setColNames(String[] colNames) {
-      this.colNames = colNames;
-      return this;
-    }
-
     public Builder setIndexType(String indexType) {
-      this.indexType = HoodieIndexType.of(indexType);
+      this.indexType = SecondaryIndexType.of(indexType);
       return this;
     }
 
-    public Builder setColOptions(Map<String, Map<String, String>> colOptions) {
-      this.colOptions = colOptions;
+    public Builder setColumns(LinkedHashMap<String, Map<String, String>> columns) {
+      this.columns = columns;
       return this;
     }
 
@@ -112,8 +117,15 @@ public class HoodieIndex {
       return this;
     }
 
-    public HoodieIndex build() {
-      return new HoodieIndex(indexName, colNames, indexType, colOptions, options);
+    public HoodieSecondaryIndex build() {
+      return new HoodieSecondaryIndex(indexName, indexType, columns, options);
+    }
+  }
+
+  public static class HoodieIndexCompactor implements Comparator<HoodieSecondaryIndex> {
+    @Override
+    public int compare(HoodieSecondaryIndex o1, HoodieSecondaryIndex o2) {
+      return o1.indexName.compareTo(o2.indexName);
     }
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
new file mode 100644
index 0000000000..61bd7d7621
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.secondary.index;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieSecondaryIndexException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.secondary.index.SecondaryIndexUtils.getSecondaryIndexes;
+
+public class SecondaryIndexManager {
+  private static final Logger LOG = LoggerFactory.getLogger(SecondaryIndexManager.class);
+
+  private static volatile SecondaryIndexManager _instance;
+
+  private SecondaryIndexManager() {
+  }
+
+  public static SecondaryIndexManager getInstance() {
+    if (_instance == null) {
+      synchronized (SecondaryIndexManager.class) {
+        if (_instance == null) {
+          _instance = new SecondaryIndexManager();
+        }
+      }
+    }
+
+    return _instance;
+  }
+
+  /**
+   * Create a secondary index for hoodie table, two steps will be performed:
+   * 1. Add secondary index metadata to hoodie.properties
+   * 2. Trigger build secondary index
+   *
+   * @param metaClient     Hoodie table meta client
+   * @param indexName      The unique secondary index name
+   * @param indexType      Index type
+   * @param ignoreIfExists Whether ignore the creation if the specific secondary index exists
+   * @param columns        The columns referenced by this secondary index, each column
+   *                       has its own options
+   * @param options        Options for this secondary index
+   */
+  public void create(
+      HoodieTableMetaClient metaClient,
+      String indexName,
+      String indexType,
+      boolean ignoreIfExists,
+      LinkedHashMap<String, Map<String, String>> columns,
+      Map<String, String> options) {
+    Option<List<HoodieSecondaryIndex>> secondaryIndexes = getSecondaryIndexes(metaClient);
+    Set<String> colNames = columns.keySet();
+    Schema avroSchema;
+    try {
+      avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(false);
+    } catch (Exception e) {
+      throw new HoodieSecondaryIndexException(
+          "Failed to get table avro schema: " + metaClient.getTableConfig().getTableName());
+    }
+
+    for (String col : colNames) {
+      if (avroSchema.getField(col) == null) {
+        throw new HoodieSecondaryIndexException("Field not exists: " + col);
+      }
+    }
+
+    if (indexExists(secondaryIndexes, indexName, Option.of(indexType), Option.of(colNames))) {
+      if (ignoreIfExists) {
+        return;
+      } else {
+        throw new HoodieSecondaryIndexException("Secondary index already exists: " + indexName);
+      }
+    }
+
+    HoodieSecondaryIndex secondaryIndexToAdd = HoodieSecondaryIndex.builder()
+        .setIndexName(indexName)
+        .setIndexType(indexType)
+        .setColumns(columns)
+        .setOptions(options)
+        .build();
+
+    List<HoodieSecondaryIndex> newSecondaryIndexes = secondaryIndexes.map(h -> {
+      h.add(secondaryIndexToAdd);
+      return h;
+    }).orElse(Collections.singletonList(secondaryIndexToAdd));
+    newSecondaryIndexes.sort(new HoodieSecondaryIndex.HoodieIndexCompactor());
+
+    // Persistence secondary indexes' metadata to hoodie.properties file
+    Properties updatedProps = new Properties();
+    updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
+        SecondaryIndexUtils.toJsonString(newSecondaryIndexes));
+    HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), updatedProps);
+
+    LOG.info("Success to add secondary index metadata: {}", secondaryIndexToAdd);
+
+    // TODO: build index
+  }
+
+  /**
+   * Drop a secondary index by index name
+   *
+   * @param metaClient        Hoodie table meta client
+   * @param indexName         The unique secondary index name
+   * @param ignoreIfNotExists Whether ignore drop if the specific secondary index no exists
+   */
+  public void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists) {
+    Option<List<HoodieSecondaryIndex>> secondaryIndexes = getSecondaryIndexes(metaClient);
+    if (!indexExists(secondaryIndexes, indexName, Option.empty(), Option.empty())) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new HoodieSecondaryIndexException("Secondary index not exists: " + indexName);
+      }
+    }
+
+    List<HoodieSecondaryIndex> secondaryIndexesToKeep = secondaryIndexes.get().stream()
+        .filter(i -> !i.getIndexName().equals(indexName))
+        .sorted(new HoodieSecondaryIndex.HoodieIndexCompactor())
+        .collect(Collectors.toList());
+    if (CollectionUtils.nonEmpty(secondaryIndexesToKeep)) {
+      Properties updatedProps = new Properties();
+      updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
+          SecondaryIndexUtils.toJsonString(secondaryIndexesToKeep));
+      HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), updatedProps);
+    } else {
+      HoodieTableConfig.delete(metaClient.getFs(), new Path(metaClient.getMetaPath()),
+          CollectionUtils.createSet(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key()));
+    }
+
+    LOG.info("Success to delete secondary index metadata: {}", indexName);
+
+    // TODO: drop index data
+  }
+
+  /**
+   * Show secondary indexes from hoodie table
+   *
+   * @param metaClient Hoodie table meta client
+   * @return Indexes in this table
+   */
+  public Option<List<HoodieSecondaryIndex>> show(HoodieTableMetaClient metaClient) {
+    return getSecondaryIndexes(metaClient);
+  }
+
+  /**
+   * Refresh the specific secondary index
+   *
+   * @param metaClient Hoodie table meta client
+   * @param indexName  The target secondary index name
+   */
+  public void refresh(HoodieTableMetaClient metaClient, String indexName) {
+    // TODO
+  }
+
+  /**
+   * Check if the specific secondary index exists. When drop a secondary index,
+   * only check index name, but for adding a secondary index, we should also
+   * check the index type and columns when index name is different.
+   *
+   * @param secondaryIndexes Current secondary indexes in this table
+   * @param indexName        The index name of target secondary index
+   * @param indexType        The index type of target secondary index
+   * @param colNames         The column names of target secondary index
+   * @return true if secondary index exists
+   */
+  private boolean indexExists(
+      Option<List<HoodieSecondaryIndex>> secondaryIndexes,
+      String indexName,
+      Option<String> indexType,
+      Option<Set<String>> colNames) {
+    return secondaryIndexes.map(indexes ->
+        indexes.stream().anyMatch(index -> {
+          if (index.getIndexName().equals(indexName)) {
+            return true;
+          } else if (indexType.isPresent() && colNames.isPresent()) {
+            // When secondary index names are different, we should check index type
+            // and index columns to avoid repeatedly creating the same index.
+            // For example:
+            //   create index idx_name on test using lucene (name);
+            //   create index idx_name_1 on test using lucene (name);
+            return index.getIndexType().name().equalsIgnoreCase(indexType.get())
+                && CollectionUtils.diff(index.getColumns().keySet(), colNames.get()).isEmpty();
+          }
+
+          return false;
+        })).orElse(false);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexType.java
similarity index 80%
rename from hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
rename to hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexType.java
index 03618a7679..108d2effce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexType.java
@@ -17,18 +17,18 @@
  * under the License.
  */
 
-package org.apache.hudi.common.index;
+package org.apache.hudi.secondary.index;
 
 import org.apache.hudi.exception.HoodieIndexException;
 
 import java.util.Arrays;
 
-public enum HoodieIndexType {
+public enum SecondaryIndexType {
   LUCENE((byte) 1);
 
   private final byte type;
 
-  HoodieIndexType(byte type) {
+  SecondaryIndexType(byte type) {
     this.type = type;
   }
 
@@ -36,16 +36,16 @@ public enum HoodieIndexType {
     return type;
   }
 
-  public static HoodieIndexType of(byte indexType) {
-    return Arrays.stream(HoodieIndexType.values())
+  public static SecondaryIndexType of(byte indexType) {
+    return Arrays.stream(SecondaryIndexType.values())
         .filter(t -> t.type == indexType)
         .findAny()
         .orElseThrow(() ->
             new HoodieIndexException("Unknown hoodie index type:" + indexType));
   }
 
-  public static HoodieIndexType of(String indexType) {
-    return Arrays.stream(HoodieIndexType.values())
+  public static SecondaryIndexType of(String indexType) {
+    return Arrays.stream(SecondaryIndexType.values())
         .filter(t -> t.name().equals(indexType.toUpperCase()))
         .findAny()
         .orElseThrow(() ->
diff --git a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexUtils.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexUtils.java
new file mode 100644
index 0000000000..40b2ee9124
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.secondary.index;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.HoodieSecondaryIndexException;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.List;
+
+public class SecondaryIndexUtils {
+
+  /**
+   * Get secondary index metadata for this table
+   *
+   * @param metaClient HoodieTableMetaClient
+   * @return HoodieSecondaryIndex List
+   */
+  public static Option<List<HoodieSecondaryIndex>> getSecondaryIndexes(HoodieTableMetaClient metaClient) {
+    Option<String> indexesMetadata = metaClient.getTableConfig().getSecondaryIndexesMetadata();
+    return indexesMetadata.map(SecondaryIndexUtils::fromJsonString);
+  }
+
+  /**
+   * Parse secondary index str to List<HOodieSecondaryIndex>
+   *
+   * @param jsonStr Secondary indexes with json format
+   * @return List<HoodieSecondaryIndex>
+   */
+  public static List<HoodieSecondaryIndex> fromJsonString(String jsonStr) {
+    try {
+      return SecondaryIndexUtils.fromJsonString(jsonStr,
+          new TypeReference<List<HoodieSecondaryIndex>>() {
+          });
+    } catch (Exception e) {
+      throw new HoodieSecondaryIndexException("Fail to get secondary indexes", e);
+    }
+  }
+
+  public static String toJsonString(Object value) {
+    try {
+      return getObjectMapper().writeValueAsString(value);
+    } catch (JsonProcessingException e) {
+      throw new HoodieIndexException("Fail to convert object to json string", e);
+    }
+  }
+
+  public static <T> T fromJsonString(String jsonStr, TypeReference<T> type) throws Exception {
+    if (jsonStr == null || jsonStr.isEmpty()) {
+      return null;
+    }
+
+    return getObjectMapper().readValue(jsonStr, type);
+  }
+
+  public static ObjectMapper getObjectMapper() {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+    return mapper;
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
index 12ee2e8058..1cc8c99728 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
@@ -31,7 +31,7 @@ case class CreateIndex(
     indexType: String,
     ignoreIfExists: Boolean,
     columns: Seq[(Attribute, Map[String, String])],
-    properties: Map[String, String],
+    options: Map[String, String],
     override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends Command {
 
   override def children: Seq[LogicalPlan] = Seq(table)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index c8add03098..c5688965d7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -191,10 +191,10 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
         }
 
       // Convert to CreateIndexCommand
-      case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties, output)
+      case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, options, output)
         if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
         CreateIndexCommand(
-          getTableIdentifier(table), indexName, indexType, ignoreIfExists, columns, properties, output)
+          getTableIdentifier(table), indexName, indexType, ignoreIfExists, columns, options, output)
 
       // Convert to DropIndexCommand
       case DropIndex(table, indexName, ignoreIfNotExists, output)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index 5d73af31a9..8a3b5630b6 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -19,23 +19,45 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.common.index.HoodieIndex
+import com.fasterxml.jackson.annotation.{JsonAutoDetect, PropertyAccessor}
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.secondary.index.SecondaryIndexManager
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
 import org.apache.spark.sql.{Row, SparkSession}
 
+import java.util
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}
+
 case class CreateIndexCommand(
     tableId: TableIdentifier,
     indexName: String,
     indexType: String,
     ignoreIfExists: Boolean,
     columns: Seq[(Attribute, Map[String, String])],
-    properties: Map[String, String],
+    options: Map[String, String],
     override val output: Seq[Attribute]) extends IndexBaseCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // The implementation for different index type
+    val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+    val columnsMap: java.util.LinkedHashMap[String, java.util.Map[String, String]] =
+      new util.LinkedHashMap[String, java.util.Map[String, String]]()
+    columns.map(c => columnsMap.put(c._1.name, c._2.asJava))
+
+    SecondaryIndexManager.getInstance().create(
+      metaClient, indexName, indexType, ignoreIfExists, columnsMap, options.asJava)
+
+    // Invalidate cached table for queries do not access related table
+    // through {@code DefaultSource}
+    val qualifiedTableName = QualifiedTableName(
+      tableId.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
+      tableId.table)
+    sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName)
     Seq.empty
   }
 }
@@ -47,7 +69,15 @@ case class DropIndexCommand(
     override val output: Seq[Attribute]) extends IndexBaseCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // The implementation for different index type
+    val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+    SecondaryIndexManager.getInstance().drop(metaClient, indexName, ignoreIfNotExists)
+
+    // Invalidate cached table for queries do not access related table
+    // through {@code DefaultSource}
+    val qualifiedTableName = QualifiedTableName(
+      tableId.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
+      tableId.table)
+    sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName)
     Seq.empty
   }
 }
@@ -57,8 +87,25 @@ case class ShowIndexesCommand(
     override val output: Seq[Attribute]) extends IndexBaseCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // The implementation for different index type
-    Seq.empty
+    val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+    val secondaryIndexes = SecondaryIndexManager.getInstance().show(metaClient)
+
+    val mapper = getObjectMapper
+    toScalaOption(secondaryIndexes).map(x =>
+      x.asScala.map(i => {
+        val colOptions =
+          if (i.getColumns.values().asScala.forall(_.isEmpty)) "" else mapper.writeValueAsString(i.getColumns)
+        val options = if (i.getOptions.isEmpty) "" else mapper.writeValueAsString(i.getOptions)
+        Row(i.getIndexName, i.getColumns.keySet().asScala.mkString(","),
+          i.getIndexType.name().toLowerCase, colOptions, options)
+      }).toSeq).getOrElse(Seq.empty[Row])
+  }
+
+  protected def getObjectMapper: ObjectMapper = {
+    val mapper = new ObjectMapper
+    mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+    mapper
   }
 }
 
@@ -68,7 +115,8 @@ case class RefreshIndexCommand(
     override val output: Seq[Attribute]) extends IndexBaseCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // The implementation for different index type
+    val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+    SecondaryIndexManager.getInstance().refresh(metaClient, indexName)
     Seq.empty
   }
 }
@@ -76,26 +124,21 @@ case class RefreshIndexCommand(
 abstract class IndexBaseCommand extends HoodieLeafRunnableCommand with Logging {
 
   /**
-   * Check hoodie index exists. In a hoodie table, hoodie index name
-   * must be unique, so the index name will be checked firstly,
+   * Create hoodie table meta client according to given table identifier and
+   * spark session
    *
-   * @param secondaryIndexes Current hoodie indexes
-   * @param indexName        The index name to be checked
-   * @param colNames         The column names to be checked
-   * @return true if the index exists
+   * @param tableId      The table identifier
+   * @param sparkSession The spark session
+   * @return The hoodie table meta client
    */
-  def indexExists(
-      secondaryIndexes: Option[Array[HoodieIndex]],
-      indexName: String,
-      indexType: Option[String] = None,
-      colNames: Option[Array[String]] = None): Boolean = {
-    secondaryIndexes.exists(i => {
-      i.exists(_.getIndexName.equals(indexName)) ||
-          // Index type and column name need to be checked if present
-          indexType.exists(t =>
-            colNames.exists(c =>
-              i.exists(index =>
-                index.getIndexType.name().equalsIgnoreCase(t) && index.getColNames.sameElements(c))))
-    })
+  def createHoodieTableMetaClient(
+      tableId: TableIdentifier,
+      sparkSession: SparkSession): HoodieTableMetaClient = {
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
+    val basePath = getTableLocation(catalogTable, sparkSession)
+    HoodieTableMetaClient.builder()
+        .setConf(sparkSession.sqlContext.sparkContext.hadoopConfiguration)
+        .setBasePath(basePath)
+        .build()
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
index 3536ae9e0a..537d3ad6a3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
@@ -64,7 +64,7 @@ class TestIndexSyntax extends HoodieSparkSqlTestBase {
         assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
         assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
         assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
-        assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+        assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
 
         logicalPlan = sqlParser.parsePlan(s"create index if not exists idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")
         resolvedLogicalPlan = analyzer.execute(logicalPlan)
@@ -72,7 +72,7 @@ class TestIndexSyntax extends HoodieSparkSqlTestBase {
         assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
         assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
         assertResult(Map("order" -> "desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2)
-        assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+        assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
 
         logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on $tableName")
         resolvedLogicalPlan = analyzer.execute(logicalPlan)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
new file mode 100644
index 0000000000..eae89099a6
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.index
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestSecondaryIndex extends HoodieSparkSqlTestBase {
+  test("Test Create/Show/Drop Secondary Index") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+       """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        checkAnswer(s"show indexes from default.$tableName")()
+
+        checkAnswer(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")()
+        checkAnswer(s"create index idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")()
+
+        // Create an index with multiple columns
+        checkException(s"create index idx_id_ts on $tableName using lucene (id, ts)")("Lucene index only support single column")
+
+        // Create an index with the occupied name
+        checkException(s"create index idx_price on $tableName using lucene (price)")(
+          "Secondary index already exists: idx_price"
+        )
+
+        // Create indexes repeatedly on columns(index name is different, but the index type and involved column is same)
+        checkException(s"create index idx_price_1 on $tableName using lucene (price)")(
+          "Secondary index already exists: idx_price_1"
+        )
+
+        spark.sql(s"show indexes from $tableName").show()
+        checkAnswer(s"show indexes from $tableName")(
+          Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
+          Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
+        )
+
+        checkAnswer(s"drop index idx_name on $tableName")()
+        checkException(s"drop index idx_name on $tableName")("Secondary index not exists: idx_name")
+
+        spark.sql(s"show indexes from $tableName").show()
+        checkAnswer(s"show indexes from $tableName")(
+          Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
+        )
+
+        checkAnswer(s"drop index idx_price on $tableName")()
+        checkAnswer(s"show indexes from $tableName")()
+
+        checkException(s"drop index idx_price on $tableName")("Secondary index not exists: idx_price")
+
+        checkException(s"create index idx_price_1 on $tableName using lucene (field_not_exist)")(
+          "Field not exists: field_not_exist"
+        )
+      }
+    }
+  }
+}