You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/03/20 04:54:42 UTC
[hudi] branch master updated: [HUDI-3663] Fixing Column Stats index to properly handle first Data Table commit (#5070)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1b6e201 [HUDI-3663] Fixing Column Stats index to properly handle first Data Table commit (#5070)
1b6e201 is described below
commit 1b6e201160ef18b0c7d86d1cafd104609489dad7
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Sat Mar 19 21:54:13 2022 -0700
[HUDI-3663] Fixing Column Stats index to properly handle first Data Table commit (#5070)
* Fixed metadata conversion util to extract schema from `HoodieCommitMetadata`
* Fixed failure to fetch columns to index in empty table
* Abort indexing seq in case there are no columns to index
* Fallback to index at least primary key columns, in case no writer schema could be obtained to index all columns
* Fixed `getRecordFields` incorrectly ignoring default value
* Make sure Hudi metadata fields are also indexed
---
.../hudi/client/functional/TestHoodieIndex.java | 19 ++-
.../hudi/common/table/HoodieTableConfig.java | 8 +-
.../java/org/apache/hudi/common/util/Option.java | 16 ++-
.../hudi/metadata/HoodieTableMetadataUtil.java | 142 +++++++++++++--------
4 files changed, 119 insertions(+), 66 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index 876a5d8..8c27e48 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -72,6 +72,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import scala.Tuple2;
@@ -258,19 +259,23 @@ public class TestHoodieIndex extends TestHoodieMetadataBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
// Insert 200 records
- JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
- Assertions.assertNoWriteErrors(writeStatues.collect());
- List<String> fileIds = writeStatues.map(WriteStatus::getFileId).collect();
- // commit this upsert
- writeClient.commit(newCommitTime, writeStatues);
+ JavaRDD<WriteStatus> writeStatusesRDD = writeClient.upsert(writeRecords, newCommitTime);
+ // NOTE: This will trigger an actual write
+ List<WriteStatus> writeStatuses = writeStatusesRDD.collect();
+ Assertions.assertNoWriteErrors(writeStatuses);
+ // Commit
+ writeClient.commit(newCommitTime, jsc.parallelize(writeStatuses));
+
+ List<String> fileIds = writeStatuses.stream().map(WriteStatus::getFileId).collect(Collectors.toList());
+
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
// Now tagLocation for these records, hbaseIndex should tag them
JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writeRecords, hoodieTable);
- assert (javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size() == totalRecords);
+ assertEquals(totalRecords, javaRDD.filter(HoodieRecord::isCurrentLocationKnown).collect().size());
// check tagged records are tagged with correct fileIds
- assert (javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size() == 0);
+ assertEquals(0, javaRDD.filter(record -> record.getCurrentLocation().getFileId() == null).collect().size());
List<String> taggedFileIds = javaRDD.map(record -> record.getCurrentLocation().getFileId()).distinct().collect();
Map<String, String> recordKeyToPartitionPathMap = new HashMap();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 077cc81..bc8a5c4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -461,11 +461,9 @@ public class HoodieTableConfig extends HoodieConfig {
}
public Option<String[]> getRecordKeyFields() {
- if (contains(RECORDKEY_FIELDS)) {
- return Option.of(Arrays.stream(getString(RECORDKEY_FIELDS).split(","))
- .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {}));
- }
- return Option.empty();
+ String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ return Option.of(Arrays.stream(keyFieldsValue.split(","))
+ .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {}));
}
public Option<String[]> getPartitionFields() {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java
index 193bf53..3d4bfcb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/Option.java
@@ -34,7 +34,7 @@ public final class Option<T> implements Serializable {
private static final long serialVersionUID = 0L;
- private static final Option<?> NULL_VAL = new Option<>();
+ private static final Option<?> EMPTY = new Option<>();
private final T val;
@@ -67,8 +67,9 @@ public final class Option<T> implements Serializable {
this.val = val;
}
+ @SuppressWarnings("unchecked")
public static <T> Option<T> empty() {
- return (Option<T>) NULL_VAL;
+ return (Option<T>) EMPTY;
}
public static <T> Option<T> of(T value) {
@@ -108,6 +109,17 @@ public final class Option<T> implements Serializable {
}
}
+ public <U> Option<U> flatMap(Function<? super T, Option<U>> mapper) {
+ if (null == mapper) {
+ throw new NullPointerException("mapper should not be null");
+ }
+ if (!isPresent()) {
+ return empty();
+ } else {
+ return Objects.requireNonNull(mapper.apply(val));
+ }
+ }
+
/**
* Returns this {@link Option} if not empty, otherwise evaluates the provided supplier
* and returns the alternative
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index f0388cc..4390e87 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -67,6 +68,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -78,6 +80,7 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString;
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION;
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX;
@@ -86,6 +89,7 @@ import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE;
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE;
import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
@@ -379,15 +383,24 @@ public class HoodieTableMetadataUtil {
deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, entry)));
});
- final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
- final int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- HoodieData<Pair<String, String>> deleteFileListRDD = engineContext.parallelize(deleteFileList, parallelism);
- return deleteFileListRDD.flatMap(deleteFileInfoPair -> {
- if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), recordsGenerationParams.getDataMetaClient(), columnsToIndex, true).iterator();
- }
- return Collections.emptyListIterator();
- });
+ HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
+
+ List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+ dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient));
+
+ if (columnsToIndex.isEmpty()) {
+ // In case there are no columns to index, bail
+ return engineContext.emptyHoodieData();
+ }
+
+ int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+ return engineContext.parallelize(deleteFileList, parallelism)
+ .flatMap(deleteFileInfoPair -> {
+ if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), dataTableMetaClient, columnsToIndex, true).iterator();
+ }
+ return Collections.emptyListIterator();
+ });
}
/**
@@ -698,7 +711,15 @@ public class HoodieTableMetadataUtil {
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams) {
HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
- final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
+ HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
+
+ final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+ dataTableMetaClient.getTableConfig(), tryResolveSchemaForTable(dataTableMetaClient));
+
+ if (columnsToIndex.isEmpty()) {
+ // In case there are no columns to index, bail
+ return engineContext.emptyHoodieData();
+ }
final List<Pair<String, List<String>>> partitionToDeletedFilesList = partitionToDeletedFiles.entrySet()
.stream().map(e -> Pair.of(e.getKey(), e.getValue())).collect(Collectors.toList());
@@ -712,7 +733,7 @@ public class HoodieTableMetadataUtil {
return deletedFileList.stream().flatMap(deletedFile -> {
final String filePathWithPartition = partitionName + "/" + deletedFile;
- return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, true);
+ return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true);
}).iterator();
});
allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
@@ -733,7 +754,7 @@ public class HoodieTableMetadataUtil {
return Stream.empty();
}
final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthEntry.getKey();
- return getColumnStats(partition, filePathWithPartition, recordsGenerationParams.getDataMetaClient(), columnsToIndex, false);
+ return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false);
}).iterator();
});
@@ -838,55 +859,59 @@ public class HoodieTableMetadataUtil {
public static HoodieData<HoodieRecord> convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
HoodieEngineContext engineContext,
MetadataRecordsGenerationParams recordsGenerationParams) {
- try {
- List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
- .flatMap(entry -> entry.stream()).collect(Collectors.toList());
- return HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, allWriteStats, recordsGenerationParams);
- } catch (Exception e) {
- throw new HoodieException("Failed to generate column stats records for metadata table ", e);
- }
- }
+ List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
- /**
- * Create column stats from write status.
- *
- * @param engineContext - Engine context
- * @param allWriteStats - Write status to convert
- * @param recordsGenerationParams - Parameters for columns stats record generation
- */
- public static HoodieData<HoodieRecord> createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
- List<HoodieWriteStat> allWriteStats,
- MetadataRecordsGenerationParams recordsGenerationParams) {
if (allWriteStats.isEmpty()) {
return engineContext.emptyHoodieData();
}
- final List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams.getDataMetaClient(), recordsGenerationParams.isAllColumnStatsIndexEnabled());
- final int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- HoodieData<HoodieWriteStat> allWriteStatsRDD = engineContext.parallelize(allWriteStats, parallelism);
- return allWriteStatsRDD.flatMap(writeStat -> translateWriteStatToColumnStats(writeStat, recordsGenerationParams.getDataMetaClient(), columnsToIndex).iterator());
+
+ try {
+ Option<Schema> writerSchema =
+ Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
+ .flatMap(writerSchemaStr ->
+ isNullOrEmpty(writerSchemaStr)
+ ? Option.empty()
+ : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+
+ HoodieTableMetaClient dataTableMetaClient = recordsGenerationParams.getDataMetaClient();
+ HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig();
+
+ // NOTE: Writer schema added to commit metadata will not contain Hudi's metadata fields
+ Option<Schema> tableSchema = writerSchema.map(schema ->
+ tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema);
+
+ List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+ tableConfig, tableSchema);
+
+ if (columnsToIndex.isEmpty()) {
+ // In case there are no columns to index, bail
+ return engineContext.emptyHoodieData();
+ }
+
+ int parallelism = Math.max(Math.min(allWriteStats.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+ return engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat ->
+ translateWriteStatToColumnStats(writeStat, dataTableMetaClient, columnsToIndex).iterator());
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate column stats records for metadata table", e);
+ }
}
/**
* Get the latest columns for the table for column stats indexing.
- *
- * @param datasetMetaClient - Data table meta client
- * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing enabled for all columns
*/
- private static List<String> getColumnsToIndex(HoodieTableMetaClient datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
- if (!isMetaIndexColumnStatsForAllColumns
- || datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() < 1) {
- return Arrays.asList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp().split(","));
+ private static List<String> getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
+ HoodieTableConfig tableConfig,
+ Option<Schema> writerSchemaOpt) {
+ if (recordsGenParams.isAllColumnStatsIndexEnabled() && writerSchemaOpt.isPresent()) {
+ return writerSchemaOpt.get().getFields().stream()
+ .map(Schema.Field::name).collect(Collectors.toList());
}
- TableSchemaResolver schemaResolver = new TableSchemaResolver(datasetMetaClient);
- // consider nested fields as well. if column stats is enabled only for a subset of columns,
- // directly use them instead of all columns from the latest table schema
- try {
- return schemaResolver.getTableAvroSchema().getFields().stream()
- .map(entry -> entry.name()).collect(Collectors.toList());
- } catch (Exception e) {
- throw new HoodieException("Failed to get latest columns for " + datasetMetaClient.getBasePath());
- }
+ // In case no writer schema could be obtained we fall back to only index primary key
+ // columns
+ return Arrays.asList(tableConfig.getRecordKeyFields().get());
}
public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats oldColumnStats, HoodieMetadataColumnStats newColumnStats) {
@@ -914,7 +939,7 @@ public class HoodieTableMetadataUtil {
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMap.values());
return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
}
- return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex,false);
+ return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
}
private static Stream<HoodieRecord> getColumnStats(final String partitionPath, final String filePathWithPartition,
@@ -1023,7 +1048,7 @@ public class HoodieTableMetadataUtil {
columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize);
- if (!StringUtils.isNullOrEmpty(fieldVal)) {
+ if (!isNullOrEmpty(fieldVal)) {
// set the min value of the field
if (!columnStats.containsKey(MIN)) {
columnStats.put(MIN, fieldVal);
@@ -1043,4 +1068,17 @@ public class HoodieTableMetadataUtil {
}
});
}
+
+ private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
+ if (dataTableMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants() == 0) {
+ return Option.empty();
+ }
+
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
+ try {
+ return Option.of(schemaResolver.getTableAvroSchema());
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
+ }
+ }
}