You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2022/10/12 12:04:45 UTC
[hudi] branch master updated: Implement Create/Drop/Show/Refresh Secondary Index (#5933)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b789d14535 Implement Create/Drop/Show/Refresh Secondary Index (#5933)
b789d14535 is described below
commit b789d14535c143e14640b18af8fd90b24d7b5b64
Author: huberylee <sh...@foxmail.com>
AuthorDate: Wed Oct 12 20:04:35 2022 +0800
Implement Create/Drop/Show/Refresh Secondary Index (#5933)
---
.../hudi/common/table/HoodieTableConfig.java | 13 ++
.../hudi/common/table/HoodieTableMetaClient.java | 14 +-
.../exception/HoodieSecondaryIndexException.java | 30 +++
.../index/HoodieSecondaryIndex.java} | 82 ++++----
.../secondary/index/SecondaryIndexManager.java | 221 +++++++++++++++++++++
.../index/SecondaryIndexType.java} | 14 +-
.../hudi/secondary/index/SecondaryIndexUtils.java | 87 ++++++++
.../spark/sql/catalyst/plans/logical/Index.scala | 2 +-
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 4 +-
.../spark/sql/hudi/command/IndexCommands.scala | 97 ++++++---
.../sql/hudi/command/index/TestIndexSyntax.scala | 4 +-
.../hudi/command/index/TestSecondaryIndex.scala | 92 +++++++++
12 files changed, 585 insertions(+), 75 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f295b0019c..ac3608fc00 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -252,6 +252,11 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Comma-separated list of metadata partitions that have been completely built and in-sync with data table. "
+ "These partitions are ready for use by the readers");
+ public static final ConfigProperty<String> SECONDARY_INDEXES_METADATA = ConfigProperty
+ .key("hoodie.table.secondary.indexes.metadata")
+ .noDefaultValue()
+ .withDocumentation("The metadata of secondary indexes");
+
private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // <database_name>.<table_name>
public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) {
@@ -516,6 +521,14 @@ public class HoodieTableConfig extends HoodieConfig {
return Option.empty();
}
+ public Option<String> getSecondaryIndexesMetadata() {
+ if (contains(SECONDARY_INDEXES_METADATA)) {
+ return Option.of(getString(SECONDARY_INDEXES_METADATA));
+ }
+
+ return Option.empty();
+ }
+
/**
* @returns the partition field prop.
* @deprecated please use {@link #getPartitionFields()} instead
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 610a0f4185..87f2410af4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -744,6 +744,7 @@ public class HoodieTableMetaClient implements Serializable {
private Boolean shouldDropPartitionColumns;
private String metadataPartitions;
private String inflightMetadataPartitions;
+ private String secondaryIndexesMetadata;
/**
* Persist the configs that is written at the first time, and should not be changed.
@@ -888,6 +889,11 @@ public class HoodieTableMetaClient implements Serializable {
return this;
}
+ public PropertyBuilder setSecondaryIndexesMetadata(String secondaryIndexesMetadata) {
+ this.secondaryIndexesMetadata = secondaryIndexesMetadata;
+ return this;
+ }
+
private void set(String key, Object value) {
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
this.others.put(key, value);
@@ -895,7 +901,7 @@ public class HoodieTableMetaClient implements Serializable {
}
public PropertyBuilder set(Map<String, Object> props) {
- for (String key: HoodieTableConfig.PERSISTED_CONFIG_LIST) {
+ for (String key : HoodieTableConfig.PERSISTED_CONFIG_LIST) {
Object value = props.get(key);
if (value != null) {
set(key, value);
@@ -1000,6 +1006,9 @@ public class HoodieTableMetaClient implements Serializable {
if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
}
+ if (hoodieConfig.contains(HoodieTableConfig.SECONDARY_INDEXES_METADATA)) {
+ setSecondaryIndexesMetadata(hoodieConfig.getString(HoodieTableConfig.SECONDARY_INDEXES_METADATA));
+ }
return this;
}
@@ -1096,6 +1105,9 @@ public class HoodieTableMetaClient implements Serializable {
if (null != inflightMetadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
}
+ if (null != secondaryIndexesMetadata) {
+ tableConfig.setValue(HoodieTableConfig.SECONDARY_INDEXES_METADATA, secondaryIndexesMetadata);
+ }
return tableConfig.getProps();
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java
new file mode 100644
index 0000000000..361416c909
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieSecondaryIndexException extends HoodieException {
+ public HoodieSecondaryIndexException(String message) {
+ super(message);
+ }
+
+ public HoodieSecondaryIndexException(String message, Throwable t) {
+ super(message, t);
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/HoodieSecondaryIndex.java
similarity index 54%
rename from hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
rename to hudi-common/src/main/java/org/apache/hudi/secondary/index/HoodieSecondaryIndex.java
index 6dabb1a41f..8b50d9268e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/HoodieSecondaryIndex.java
@@ -17,48 +17,48 @@
* under the License.
*/
-package org.apache.hudi.common.index;
+package org.apache.hudi.secondary.index;
-import java.util.Arrays;
+import org.apache.hudi.exception.HoodieSecondaryIndexException;
+
+import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.Map;
-public class HoodieIndex {
+public class HoodieSecondaryIndex {
private String indexName;
- private String[] colNames;
- private HoodieIndexType indexType;
- private Map<String, Map<String, String>> colOptions;
+ private SecondaryIndexType indexType;
+
+ // The index fields need to be in order
+ private LinkedHashMap<String, Map<String, String>> columns;
private Map<String, String> options;
- public HoodieIndex() {
+ public HoodieSecondaryIndex() {
}
- public HoodieIndex(
+ public HoodieSecondaryIndex(
String indexName,
- String[] colNames,
- HoodieIndexType indexType,
- Map<String, Map<String, String>> colOptions,
+ SecondaryIndexType indexType,
+ LinkedHashMap<String, Map<String, String>> columns,
Map<String, String> options) {
this.indexName = indexName;
- this.colNames = colNames;
this.indexType = indexType;
- this.colOptions = colOptions;
+ this.columns = columns;
this.options = options;
+
+ validate();
}
public String getIndexName() {
return indexName;
}
- public String[] getColNames() {
- return colNames;
- }
-
- public HoodieIndexType getIndexType() {
+ public SecondaryIndexType getIndexType() {
return indexType;
}
- public Map<String, Map<String, String>> getColOptions() {
- return colOptions;
+ public Map<String, Map<String, String>> getColumns() {
+ return columns;
}
public Map<String, String> getOptions() {
@@ -69,22 +69,32 @@ public class HoodieIndex {
return new Builder();
}
+ private void validate() {
+ switch (indexType) {
+ case LUCENE:
+ if (columns.size() != 1) {
+ throw new HoodieSecondaryIndexException("Lucene index only support single column");
+ }
+ break;
+ default:
+ return;
+ }
+ }
+
@Override
public String toString() {
return "HoodieIndex{"
+ "indexName='" + indexName + '\''
- + ", colNames='" + Arrays.toString(colNames) + '\''
+ ", indexType=" + indexType
- + ", colOptions=" + colOptions
+ + ", columns=" + columns
+ ", options=" + options
+ '}';
}
public static class Builder {
private String indexName;
- private String[] colNames;
- private HoodieIndexType indexType;
- private Map<String, Map<String, String>> colOptions;
+ private SecondaryIndexType indexType;
+ private LinkedHashMap<String, Map<String, String>> columns;
private Map<String, String> options;
public Builder setIndexName(String indexName) {
@@ -92,18 +102,13 @@ public class HoodieIndex {
return this;
}
- public Builder setColNames(String[] colNames) {
- this.colNames = colNames;
- return this;
- }
-
public Builder setIndexType(String indexType) {
- this.indexType = HoodieIndexType.of(indexType);
+ this.indexType = SecondaryIndexType.of(indexType);
return this;
}
- public Builder setColOptions(Map<String, Map<String, String>> colOptions) {
- this.colOptions = colOptions;
+ public Builder setColumns(LinkedHashMap<String, Map<String, String>> columns) {
+ this.columns = columns;
return this;
}
@@ -112,8 +117,15 @@ public class HoodieIndex {
return this;
}
- public HoodieIndex build() {
- return new HoodieIndex(indexName, colNames, indexType, colOptions, options);
+ public HoodieSecondaryIndex build() {
+ return new HoodieSecondaryIndex(indexName, indexType, columns, options);
+ }
+ }
+
+ public static class HoodieIndexCompactor implements Comparator<HoodieSecondaryIndex> {
+ @Override
+ public int compare(HoodieSecondaryIndex o1, HoodieSecondaryIndex o2) {
+ return o1.indexName.compareTo(o2.indexName);
}
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
new file mode 100644
index 0000000000..61bd7d7621
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexManager.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.secondary.index;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieSecondaryIndexException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.secondary.index.SecondaryIndexUtils.getSecondaryIndexes;
+
+public class SecondaryIndexManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SecondaryIndexManager.class);
+
+ private static volatile SecondaryIndexManager _instance;
+
+ private SecondaryIndexManager() {
+ }
+
+ public static SecondaryIndexManager getInstance() {
+ if (_instance == null) {
+ synchronized (SecondaryIndexManager.class) {
+ if (_instance == null) {
+ _instance = new SecondaryIndexManager();
+ }
+ }
+ }
+
+ return _instance;
+ }
+
+ /**
+ * Create a secondary index for hoodie table, two steps will be performed:
+ * 1. Add secondary index metadata to hoodie.properties
+ * 2. Trigger build secondary index
+ *
+ * @param metaClient Hoodie table meta client
+ * @param indexName The unique secondary index name
+ * @param indexType Index type
+ * @param ignoreIfExists Whether ignore the creation if the specific secondary index exists
+ * @param columns The columns referenced by this secondary index, each column
+ * has its own options
+ * @param options Options for this secondary index
+ */
+ public void create(
+ HoodieTableMetaClient metaClient,
+ String indexName,
+ String indexType,
+ boolean ignoreIfExists,
+ LinkedHashMap<String, Map<String, String>> columns,
+ Map<String, String> options) {
+ Option<List<HoodieSecondaryIndex>> secondaryIndexes = getSecondaryIndexes(metaClient);
+ Set<String> colNames = columns.keySet();
+ Schema avroSchema;
+ try {
+ avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(false);
+ } catch (Exception e) {
+ throw new HoodieSecondaryIndexException(
+ "Failed to get table avro schema: " + metaClient.getTableConfig().getTableName());
+ }
+
+ for (String col : colNames) {
+ if (avroSchema.getField(col) == null) {
+ throw new HoodieSecondaryIndexException("Field not exists: " + col);
+ }
+ }
+
+ if (indexExists(secondaryIndexes, indexName, Option.of(indexType), Option.of(colNames))) {
+ if (ignoreIfExists) {
+ return;
+ } else {
+ throw new HoodieSecondaryIndexException("Secondary index already exists: " + indexName);
+ }
+ }
+
+ HoodieSecondaryIndex secondaryIndexToAdd = HoodieSecondaryIndex.builder()
+ .setIndexName(indexName)
+ .setIndexType(indexType)
+ .setColumns(columns)
+ .setOptions(options)
+ .build();
+
+ List<HoodieSecondaryIndex> newSecondaryIndexes = secondaryIndexes.map(h -> {
+ h.add(secondaryIndexToAdd);
+ return h;
+ }).orElse(Collections.singletonList(secondaryIndexToAdd));
+ newSecondaryIndexes.sort(new HoodieSecondaryIndex.HoodieIndexCompactor());
+
+ // Persistence secondary indexes' metadata to hoodie.properties file
+ Properties updatedProps = new Properties();
+ updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
+ SecondaryIndexUtils.toJsonString(newSecondaryIndexes));
+ HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), updatedProps);
+
+ LOG.info("Success to add secondary index metadata: {}", secondaryIndexToAdd);
+
+ // TODO: build index
+ }
+
+ /**
+ * Drop a secondary index by index name
+ *
+ * @param metaClient Hoodie table meta client
+ * @param indexName The unique secondary index name
+ * @param ignoreIfNotExists Whether ignore drop if the specific secondary index no exists
+ */
+ public void drop(HoodieTableMetaClient metaClient, String indexName, boolean ignoreIfNotExists) {
+ Option<List<HoodieSecondaryIndex>> secondaryIndexes = getSecondaryIndexes(metaClient);
+ if (!indexExists(secondaryIndexes, indexName, Option.empty(), Option.empty())) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new HoodieSecondaryIndexException("Secondary index not exists: " + indexName);
+ }
+ }
+
+ List<HoodieSecondaryIndex> secondaryIndexesToKeep = secondaryIndexes.get().stream()
+ .filter(i -> !i.getIndexName().equals(indexName))
+ .sorted(new HoodieSecondaryIndex.HoodieIndexCompactor())
+ .collect(Collectors.toList());
+ if (CollectionUtils.nonEmpty(secondaryIndexesToKeep)) {
+ Properties updatedProps = new Properties();
+ updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key(),
+ SecondaryIndexUtils.toJsonString(secondaryIndexesToKeep));
+ HoodieTableConfig.update(metaClient.getFs(), new Path(metaClient.getMetaPath()), updatedProps);
+ } else {
+ HoodieTableConfig.delete(metaClient.getFs(), new Path(metaClient.getMetaPath()),
+ CollectionUtils.createSet(HoodieTableConfig.SECONDARY_INDEXES_METADATA.key()));
+ }
+
+ LOG.info("Success to delete secondary index metadata: {}", indexName);
+
+ // TODO: drop index data
+ }
+
+ /**
+ * Show secondary indexes from hoodie table
+ *
+ * @param metaClient Hoodie table meta client
+ * @return Indexes in this table
+ */
+ public Option<List<HoodieSecondaryIndex>> show(HoodieTableMetaClient metaClient) {
+ return getSecondaryIndexes(metaClient);
+ }
+
+ /**
+ * Refresh the specific secondary index
+ *
+ * @param metaClient Hoodie table meta client
+ * @param indexName The target secondary index name
+ */
+ public void refresh(HoodieTableMetaClient metaClient, String indexName) {
+ // TODO
+ }
+
+ /**
+ * Check if the specific secondary index exists. When drop a secondary index,
+ * only check index name, but for adding a secondary index, we should also
+ * check the index type and columns when index name is different.
+ *
+ * @param secondaryIndexes Current secondary indexes in this table
+ * @param indexName The index name of target secondary index
+ * @param indexType The index type of target secondary index
+ * @param colNames The column names of target secondary index
+ * @return true if secondary index exists
+ */
+ private boolean indexExists(
+ Option<List<HoodieSecondaryIndex>> secondaryIndexes,
+ String indexName,
+ Option<String> indexType,
+ Option<Set<String>> colNames) {
+ return secondaryIndexes.map(indexes ->
+ indexes.stream().anyMatch(index -> {
+ if (index.getIndexName().equals(indexName)) {
+ return true;
+ } else if (indexType.isPresent() && colNames.isPresent()) {
+ // When secondary index names are different, we should check index type
+ // and index columns to avoid repeatedly creating the same index.
+ // For example:
+ // create index idx_name on test using lucene (name);
+ // create index idx_name_1 on test using lucene (name);
+ return index.getIndexType().name().equalsIgnoreCase(indexType.get())
+ && CollectionUtils.diff(index.getColumns().keySet(), colNames.get()).isEmpty();
+ }
+
+ return false;
+ })).orElse(false);
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexType.java
similarity index 80%
rename from hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
rename to hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexType.java
index 03618a7679..108d2effce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexType.java
@@ -17,18 +17,18 @@
* under the License.
*/
-package org.apache.hudi.common.index;
+package org.apache.hudi.secondary.index;
import org.apache.hudi.exception.HoodieIndexException;
import java.util.Arrays;
-public enum HoodieIndexType {
+public enum SecondaryIndexType {
LUCENE((byte) 1);
private final byte type;
- HoodieIndexType(byte type) {
+ SecondaryIndexType(byte type) {
this.type = type;
}
@@ -36,16 +36,16 @@ public enum HoodieIndexType {
return type;
}
- public static HoodieIndexType of(byte indexType) {
- return Arrays.stream(HoodieIndexType.values())
+ public static SecondaryIndexType of(byte indexType) {
+ return Arrays.stream(SecondaryIndexType.values())
.filter(t -> t.type == indexType)
.findAny()
.orElseThrow(() ->
new HoodieIndexException("Unknown hoodie index type:" + indexType));
}
- public static HoodieIndexType of(String indexType) {
- return Arrays.stream(HoodieIndexType.values())
+ public static SecondaryIndexType of(String indexType) {
+ return Arrays.stream(SecondaryIndexType.values())
.filter(t -> t.name().equals(indexType.toUpperCase()))
.findAny()
.orElseThrow(() ->
diff --git a/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexUtils.java b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexUtils.java
new file mode 100644
index 0000000000..40b2ee9124
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/secondary/index/SecondaryIndexUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.secondary.index;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.exception.HoodieSecondaryIndexException;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.List;
+
+public class SecondaryIndexUtils {
+
+ /**
+ * Get secondary index metadata for this table
+ *
+ * @param metaClient HoodieTableMetaClient
+ * @return HoodieSecondaryIndex List
+ */
+ public static Option<List<HoodieSecondaryIndex>> getSecondaryIndexes(HoodieTableMetaClient metaClient) {
+ Option<String> indexesMetadata = metaClient.getTableConfig().getSecondaryIndexesMetadata();
+ return indexesMetadata.map(SecondaryIndexUtils::fromJsonString);
+ }
+
+ /**
+ * Parse secondary index str to List<HOodieSecondaryIndex>
+ *
+ * @param jsonStr Secondary indexes with json format
+ * @return List<HoodieSecondaryIndex>
+ */
+ public static List<HoodieSecondaryIndex> fromJsonString(String jsonStr) {
+ try {
+ return SecondaryIndexUtils.fromJsonString(jsonStr,
+ new TypeReference<List<HoodieSecondaryIndex>>() {
+ });
+ } catch (Exception e) {
+ throw new HoodieSecondaryIndexException("Fail to get secondary indexes", e);
+ }
+ }
+
+ public static String toJsonString(Object value) {
+ try {
+ return getObjectMapper().writeValueAsString(value);
+ } catch (JsonProcessingException e) {
+ throw new HoodieIndexException("Fail to convert object to json string", e);
+ }
+ }
+
+ public static <T> T fromJsonString(String jsonStr, TypeReference<T> type) throws Exception {
+ if (jsonStr == null || jsonStr.isEmpty()) {
+ return null;
+ }
+
+ return getObjectMapper().readValue(jsonStr, type);
+ }
+
+ public static ObjectMapper getObjectMapper() {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+ mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ return mapper;
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
index 12ee2e8058..1cc8c99728 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala
@@ -31,7 +31,7 @@ case class CreateIndex(
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(Attribute, Map[String, String])],
- properties: Map[String, String],
+ options: Map[String, String],
override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends Command {
override def children: Seq[LogicalPlan] = Seq(table)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index c8add03098..c5688965d7 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -191,10 +191,10 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
}
// Convert to CreateIndexCommand
- case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties, output)
+ case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, options, output)
if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) =>
CreateIndexCommand(
- getTableIdentifier(table), indexName, indexType, ignoreIfExists, columns, properties, output)
+ getTableIdentifier(table), indexName, indexType, ignoreIfExists, columns, options, output)
// Convert to DropIndexCommand
case DropIndex(table, indexName, ignoreIfNotExists, output)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
index 5d73af31a9..8a3b5630b6 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala
@@ -19,23 +19,45 @@
package org.apache.spark.sql.hudi.command
-import org.apache.hudi.common.index.HoodieIndex
+import com.fasterxml.jackson.annotation.{JsonAutoDetect, PropertyAccessor}
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.secondary.index.SecondaryIndexManager
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.{Row, SparkSession}
+import java.util
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}
+
case class CreateIndexCommand(
tableId: TableIdentifier,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(Attribute, Map[String, String])],
- properties: Map[String, String],
+ options: Map[String, String],
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- // The implementation for different index type
+ val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+ val columnsMap: java.util.LinkedHashMap[String, java.util.Map[String, String]] =
+ new util.LinkedHashMap[String, java.util.Map[String, String]]()
+ columns.map(c => columnsMap.put(c._1.name, c._2.asJava))
+
+ SecondaryIndexManager.getInstance().create(
+ metaClient, indexName, indexType, ignoreIfExists, columnsMap, options.asJava)
+
+ // Invalidate cached table for queries do not access related table
+ // through {@code DefaultSource}
+ val qualifiedTableName = QualifiedTableName(
+ tableId.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
+ tableId.table)
+ sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName)
Seq.empty
}
}
@@ -47,7 +69,15 @@ case class DropIndexCommand(
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- // The implementation for different index type
+ val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+ SecondaryIndexManager.getInstance().drop(metaClient, indexName, ignoreIfNotExists)
+
+ // Invalidate cached table for queries do not access related table
+ // through {@code DefaultSource}
+ val qualifiedTableName = QualifiedTableName(
+ tableId.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
+ tableId.table)
+ sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName)
Seq.empty
}
}
@@ -57,8 +87,25 @@ case class ShowIndexesCommand(
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- // The implementation for different index type
- Seq.empty
+ val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+ val secondaryIndexes = SecondaryIndexManager.getInstance().show(metaClient)
+
+ val mapper = getObjectMapper
+ toScalaOption(secondaryIndexes).map(x =>
+ x.asScala.map(i => {
+ val colOptions =
+ if (i.getColumns.values().asScala.forall(_.isEmpty)) "" else mapper.writeValueAsString(i.getColumns)
+ val options = if (i.getOptions.isEmpty) "" else mapper.writeValueAsString(i.getOptions)
+ Row(i.getIndexName, i.getColumns.keySet().asScala.mkString(","),
+ i.getIndexType.name().toLowerCase, colOptions, options)
+ }).toSeq).getOrElse(Seq.empty[Row])
+ }
+
+ protected def getObjectMapper: ObjectMapper = {
+ val mapper = new ObjectMapper
+ mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+ mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
+ mapper
}
}
@@ -68,7 +115,8 @@ case class RefreshIndexCommand(
override val output: Seq[Attribute]) extends IndexBaseCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- // The implementation for different index type
+ val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
+ SecondaryIndexManager.getInstance().refresh(metaClient, indexName)
Seq.empty
}
}
@@ -76,26 +124,21 @@ case class RefreshIndexCommand(
abstract class IndexBaseCommand extends HoodieLeafRunnableCommand with Logging {
/**
- * Check hoodie index exists. In a hoodie table, hoodie index name
- * must be unique, so the index name will be checked firstly,
+ * Create hoodie table meta client according to given table identifier and
+ * spark session
*
- * @param secondaryIndexes Current hoodie indexes
- * @param indexName The index name to be checked
- * @param colNames The column names to be checked
- * @return true if the index exists
+ * @param tableId The table identifier
+ * @param sparkSession The spark session
+ * @return The hoodie table meta client
*/
- def indexExists(
- secondaryIndexes: Option[Array[HoodieIndex]],
- indexName: String,
- indexType: Option[String] = None,
- colNames: Option[Array[String]] = None): Boolean = {
- secondaryIndexes.exists(i => {
- i.exists(_.getIndexName.equals(indexName)) ||
- // Index type and column name need to be checked if present
- indexType.exists(t =>
- colNames.exists(c =>
- i.exists(index =>
- index.getIndexType.name().equalsIgnoreCase(t) && index.getColNames.sameElements(c))))
- })
+ def createHoodieTableMetaClient(
+ tableId: TableIdentifier,
+ sparkSession: SparkSession): HoodieTableMetaClient = {
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId)
+ val basePath = getTableLocation(catalogTable, sparkSession)
+ HoodieTableMetaClient.builder()
+ .setConf(sparkSession.sqlContext.sparkContext.hadoopConfiguration)
+ .setBasePath(basePath)
+ .build()
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
index 3536ae9e0a..537d3ad6a3 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala
@@ -64,7 +64,7 @@ class TestIndexSyntax extends HoodieSparkSqlTestBase {
assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists)
- assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+ assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
logicalPlan = sqlParser.parsePlan(s"create index if not exists idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")
resolvedLogicalPlan = analyzer.execute(logicalPlan)
@@ -72,7 +72,7 @@ class TestIndexSyntax extends HoodieSparkSqlTestBase {
assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName)
assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType)
assertResult(Map("order" -> "desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2)
- assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties)
+ assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].options)
logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on $tableName")
resolvedLogicalPlan = analyzer.execute(logicalPlan)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
new file mode 100644
index 0000000000..eae89099a6
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.index
+
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+class TestSecondaryIndex extends HoodieSparkSqlTestBase {
+ test("Test Create/Show/Drop Secondary Index") {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+ checkAnswer(s"show indexes from default.$tableName")()
+
+ checkAnswer(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")()
+ checkAnswer(s"create index idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")()
+
+ // Create an index with multiple columns
+ checkException(s"create index idx_id_ts on $tableName using lucene (id, ts)")("Lucene index only support single column")
+
+ // Create an index with the occupied name
+ checkException(s"create index idx_price on $tableName using lucene (price)")(
+ "Secondary index already exists: idx_price"
+ )
+
+ // Create indexes repeatedly on columns(index name is different, but the index type and involved column is same)
+ checkException(s"create index idx_price_1 on $tableName using lucene (price)")(
+ "Secondary index already exists: idx_price_1"
+ )
+
+ spark.sql(s"show indexes from $tableName").show()
+ checkAnswer(s"show indexes from $tableName")(
+ Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
+ Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
+ )
+
+ checkAnswer(s"drop index idx_name on $tableName")()
+ checkException(s"drop index idx_name on $tableName")("Secondary index not exists: idx_name")
+
+ spark.sql(s"show indexes from $tableName").show()
+ checkAnswer(s"show indexes from $tableName")(
+ Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
+ )
+
+ checkAnswer(s"drop index idx_price on $tableName")()
+ checkAnswer(s"show indexes from $tableName")()
+
+ checkException(s"drop index idx_price on $tableName")("Secondary index not exists: idx_price")
+
+ checkException(s"create index idx_price_1 on $tableName using lucene (field_not_exist)")(
+ "Field not exists: field_not_exist"
+ )
+ }
+ }
+ }
+}