You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/04/03 00:16:26 UTC
[hudi] branch master updated: [HUDI-3664] Fixing Column Stats Index composition (#5181)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cc3737b [HUDI-3664] Fixing Column Stats Index composition (#5181)
cc3737b is described below
commit cc3737be506475c11c12471be6d0296ea14c7f39
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Sat Apr 2 17:15:52 2022 -0700
[HUDI-3664] Fixing Column Stats Index composition (#5181)
Co-authored-by: Sagar Sumit <sa...@gmail.com>
---
azure-pipelines.yml | 22 +-
.../hudi/cli/commands/TestCleansCommand.java | 9 +-
.../hudi/cli/commands/TestRollbacksCommand.java | 8 +
.../apache/hudi/index/bloom/HoodieBloomIndex.java | 13 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 35 +-
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 3 +-
.../org/apache/hudi/client/TestClientRollback.java | 9 +-
.../functional/TestHoodieBackedMetadata.java | 96 +++--
.../functional/TestHoodieBackedTableMetadata.java | 26 +-
.../java/org/apache/hudi/table/TestCleaner.java | 1 +
.../table/action/compact/TestInlineCompaction.java | 4 +-
.../table/functional/TestCleanPlanExecutor.java | 21 +-
.../hudi/testutils/HoodieClientTestHarness.java | 44 ++-
hudi-common/pom.xml | 6 +
hudi-common/src/main/avro/HoodieMetadata.avsc | 186 +++++++++-
.../apache/hudi/avro/ConvertingGenericData.java | 144 ++++++++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 106 +++++-
.../apache/hudi/avro/HoodieAvroWriteSupport.java | 2 +-
.../java/org/apache/hudi/common/fs/FSUtils.java | 14 +-
.../common/model/HoodieColumnRangeMetadata.java | 34 --
.../hudi/common/model/HoodieCommitMetadata.java | 5 +-
.../hudi/common/model/HoodieDeltaWriteStat.java | 20 +-
.../org/apache/hudi/common/util/DateTimeUtils.java | 29 ++
.../hudi/metadata/HoodieMetadataPayload.java | 177 ++++++++-
.../hudi/metadata/HoodieTableMetadataUtil.java | 398 ++++++++++++++-------
.../hudi/metadata/MetadataPartitionType.java | 2 +
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 43 ++-
.../common/functional/TestHoodieLogFormat.java | 116 +++---
.../hudi/common/testutils/FileCreateUtils.java | 19 +-
.../hudi/common/testutils/HoodieTestTable.java | 8 +-
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 216 +++++++++++
.../scala/org/apache/hudi/HoodieFileIndex.scala | 64 +---
.../spark/{ => sql}/HoodieSparkTypeUtils.scala | 11 +-
.../spark/{ => sql}/HoodieUnsafeRDDUtils.scala | 22 +-
.../apache/spark/sql/hudi/DataSkippingUtils.scala | 2 +-
.../org/apache/hudi}/ColumnStatsIndexHelper.java | 65 +---
.../index/zorder/column-stats-index-table.json | 4 +
.../zorder/updated-column-stats-index-table.json | 8 +
.../index/zorder/z-index-table-merged.json | 8 -
.../test/resources/index/zorder/z-index-table.json | 4 -
.../org/apache/hudi/TestDataSkippingUtils.scala | 52 ++-
.../hudi/functional/TestColumnStatsIndex.scala | 236 +++++-------
.../TestMetadataTableWithSparkDataSource.scala | 17 +-
.../functional/TestParquetColumnProjection.scala | 3 +-
.../benchmark/SpaceCurveOptimizeBenchmark.scala | 4 +-
.../sql/HoodieSpark2CatalystExpressionUtils.scala | 2 +-
.../org/apache/spark/sql/avro/AvroSerializer.scala | 60 +++-
.../HoodieSpark3_1CatalystExpressionUtils.scala | 2 +-
.../org/apache/spark/sql/avro/AvroSerializer.scala | 59 ++-
.../HoodieSpark3_2CatalystExpressionUtils.scala | 2 +-
.../org/apache/spark/sql/avro/AvroSerializer.scala | 59 ++-
.../utilities/HoodieMetadataTableValidator.java | 35 +-
52 files changed, 1781 insertions(+), 754 deletions(-)
diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index f1b25db..8ca54c1 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -43,7 +43,7 @@ stages:
options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- task: Maven@3
displayName: UT common flink client/spark-client
inputs:
@@ -52,7 +52,7 @@ stages:
options: -Punit-tests -pl hudi-common,hudi-flink-datasource/hudi-flink,hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- task: Maven@3
displayName: FT common flink
inputs:
@@ -61,7 +61,7 @@ stages:
options: -Pfunctional-tests -pl hudi-common,hudi-flink-datasource/hudi-flink
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- job: UT_FT_2
displayName: FT client/spark-client
timeoutInMinutes: '90'
@@ -74,7 +74,7 @@ stages:
options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- task: Maven@3
displayName: FT client/spark-client
inputs:
@@ -83,7 +83,7 @@ stages:
options: -Pfunctional-tests -pl hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- job: UT_FT_3
displayName: UT FT clients & cli & utilities & sync/hive-sync
timeoutInMinutes: '90'
@@ -96,7 +96,7 @@ stages:
options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- task: Maven@3
displayName: UT clients & cli & utilities & sync/hive-sync
inputs:
@@ -105,7 +105,7 @@ stages:
options: -Punit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- task: Maven@3
displayName: FT clients & cli & utilities & sync/hive-sync
inputs:
@@ -114,7 +114,7 @@ stages:
options: -Pfunctional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- job: UT_FT_4
displayName: UT FT other modules
timeoutInMinutes: '90'
@@ -127,7 +127,7 @@ stages:
options: -T 2.5C -DskipTests
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- task: Maven@3
displayName: UT other modules
inputs:
@@ -136,7 +136,7 @@ stages:
options: -Punit-tests -pl $(EXCLUDE_TESTED_MODULES)
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- task: Maven@3
displayName: FT other modules
inputs:
@@ -145,7 +145,7 @@ stages:
options: -Pfunctional-tests -pl $(EXCLUDE_TESTED_MODULES)
publishJUnitResults: false
jdkVersionOption: '1.8'
- mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
+ mavenOptions: '-Xmx4g $(MAVEN_OPTS)'
- job: IT
displayName: IT modules
timeoutInMinutes: '90'
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
index c475c63..cac4f13 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java
@@ -27,6 +27,7 @@ import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.conf.Configuration;
@@ -47,8 +49,10 @@ import org.springframework.shell.core.CommandResult;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -94,8 +98,11 @@ public class TestCleansCommand extends CLIFunctionalTestHarness {
// Inflight Compaction
HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath,
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf);
+
+ Map<String, String> extraCommitMetadata =
+ Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA);
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2,
- Option.empty(), Option.empty());
+ Option.empty(), Option.empty(), extraCommitMetadata);
}
metaClient = HoodieTableMetaClient.reload(metaClient);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
index 0aec7c5..cf4faf2 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
@@ -26,6 +26,7 @@ import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -83,6 +84,13 @@ public class TestRollbacksCommand extends CLIFunctionalTestHarness {
};
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(tablePath)
+ .withMetadataConfig(
+ // Column Stats Index is disabled, since these tests construct tables which are
+ // not valid (empty commit metadata, etc)
+ HoodieMetadataConfig.newBuilder()
+ .withMetadataIndexColumnStats(false)
+ .build()
+ )
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
HoodieMetadataTestTable.of(metaClient, SparkHoodieBackedTableMetadataWriter.create(
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index 522bbb3..8897152 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.io.HoodieRangeInfoHandle;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -55,6 +54,7 @@ import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
+import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getCompletedMetadataPartitions;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
@@ -206,7 +206,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
* @return List of partition and file column range info pairs
*/
protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
- List<String> partitions, final HoodieEngineContext context, final HoodieTable hoodieTable) {
+ List<String> partitions, final HoodieEngineContext context, final HoodieTable<?, ?, ?, ?> hoodieTable) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");
@@ -221,15 +221,16 @@ public class HoodieBloomIndex extends HoodieIndex<Object, Object> {
return Stream.empty();
}
try {
- Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap = hoodieTable
- .getMetadataTable().getColumnStats(partitionFileNameList, keyField);
+ Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
+ hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
- entry.getValue().getMinValue(),
- entry.getValue().getMaxValue()
+ // NOTE: Here we assume that the type of the primary key field is string
+ (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
+ (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
)));
}
return result.stream();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index efae458..4ab4be3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -18,6 +18,10 @@
package org.apache.hudi.io;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
@@ -56,11 +60,6 @@ import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -76,8 +75,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.metadata.HoodieTableMetadataUtil.accumulateColumnRanges;
-import static org.apache.hudi.metadata.HoodieTableMetadataUtil.aggregateColumnStats;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
/**
* IO Operation to append data onto an existing file.
@@ -349,26 +347,21 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
if (config.isMetadataColumnStatsIndexEnabled()) {
final List<Schema.Field> fieldsToIndex;
- if (!StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
+ if (StringUtils.isNullOrEmpty(config.getColumnsEnabledForColumnStatsIndex())) {
+ // If column stats index is enabled but columns not configured then we assume that all columns should be indexed
+ fieldsToIndex = writeSchemaWithMetaFields.getFields();
+ } else {
Set<String> columnsToIndex = Stream.of(config.getColumnsEnabledForColumnStatsIndex().split(","))
.map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toSet());
+
fieldsToIndex = writeSchemaWithMetaFields.getFields().stream()
.filter(field -> columnsToIndex.contains(field.name())).collect(Collectors.toList());
- } else {
- // if column stats index is enabled but columns not configured then we assume that all columns should be indexed
- fieldsToIndex = writeSchemaWithMetaFields.getFields();
}
- Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = stat.getRecordsStats().isPresent()
- ? stat.getRecordsStats().get().getStats() : new HashMap<>();
- final String filePath = stat.getPath();
- // initialize map of column name to map of stats name to stats value
- Map<String, Map<String, Object>> columnToStats = new HashMap<>();
- fieldsToIndex.forEach(field -> columnToStats.putIfAbsent(field.name(), new HashMap<>()));
- // collect stats for columns at once per record and keep iterating through every record to eventually find col stats for all fields.
- recordList.forEach(record -> aggregateColumnStats(record, fieldsToIndex, columnToStats, config.isConsistentLogicalTimestampEnabled()));
- fieldsToIndex.forEach(field -> accumulateColumnRanges(field, filePath, columnRangeMap, columnToStats));
- stat.setRecordsStats(new HoodieDeltaWriteStat.RecordsStats<>(columnRangeMap));
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangesMetadataMap =
+ collectColumnRangeMetadata(recordList, fieldsToIndex, stat.getPath());
+
+ stat.setRecordsStats(columnRangesMetadataMap);
}
resetWriteCounts();
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index b46995e..57eb32f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -31,12 +31,11 @@ import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator}
import org.apache.spark.SPARK_VERSION
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, SparkSession}
import java.util.Properties
import org.apache.hudi.internal.schema.InternalSchema
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index c06b0a0..d2dabc0 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -318,7 +318,14 @@ public class TestClientRollback extends HoodieClientTestBase {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+ .withMetadataConfig(
+ HoodieMetadataConfig.newBuilder()
+ // Column Stats Index is disabled, since these tests construct tables which are
+ // not valid (empty commit metadata, invalid parquet files)
+ .withMetadataIndexColumnStats(false)
+ .enable(enableMetadataTable)
+ .build()
+ )
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 5c73d96..61c2775 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -154,6 +154,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
@Tag("functional")
public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
@@ -859,30 +860,31 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
- HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
-
- while (logFileReader.hasNext()) {
- HoodieLogBlock logBlock = logFileReader.next();
- if (logBlock instanceof HoodieDataBlock) {
- try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
- recordItr.forEachRemaining(indexRecord -> {
- final GenericRecord record = (GenericRecord) indexRecord;
- if (enableMetaFields) {
- // Metadata table records should have meta fields!
- assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
- assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
- } else {
- // Metadata table records should not have meta fields!
- assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
- assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
- }
-
- final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
- assertFalse(key.isEmpty());
- if (enableMetaFields) {
- assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
- }
- });
+
+ try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
+ while (logFileReader.hasNext()) {
+ HoodieLogBlock logBlock = logFileReader.next();
+ if (logBlock instanceof HoodieDataBlock) {
+ try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+ recordItr.forEachRemaining(indexRecord -> {
+ final GenericRecord record = (GenericRecord) indexRecord;
+ if (enableMetaFields) {
+ // Metadata table records should have meta fields!
+ assertNotNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNotNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+ } else {
+ // Metadata table records should not have meta fields!
+ assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+ }
+
+ final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
+ assertFalse(key.isEmpty());
+ if (enableMetaFields) {
+ assertTrue(key.equals(String.valueOf(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
+ }
+ });
+ }
}
}
}
@@ -2214,11 +2216,57 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
assertTrue(latestSlices.size()
<= (numFileVersions * metadataEnabledPartitionTypes.get(partition).getFileGroupCount()), "Should limit file slice to "
+ numFileVersions + " per file group, but was " + latestSlices.size());
+ List<HoodieLogFile> logFiles = latestSlices.get(0).getLogFiles().collect(Collectors.toList());
+ try {
+ if (MetadataPartitionType.FILES.getPartitionPath().equals(partition)) {
+ verifyMetadataRawRecords(table, logFiles, false);
+ }
+ if (MetadataPartitionType.COLUMN_STATS.getPartitionPath().equals(partition)) {
+ verifyMetadataColumnStatsRecords(logFiles);
+ }
+ } catch (IOException e) {
+ LOG.error("Metadata record validation failed", e);
+ fail("Metadata record validation failed");
+ }
});
LOG.info("Validation time=" + timer.endTimer());
}
+ private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) throws IOException {
+ for (HoodieLogFile logFile : logFiles) {
+ FileStatus[] fsStatus = fs.listStatus(logFile.getPath());
+ MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(fs, logFile.getPath());
+ if (writerSchemaMsg == null) {
+ // not a data block
+ continue;
+ }
+
+ Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
+ try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
+ while (logFileReader.hasNext()) {
+ HoodieLogBlock logBlock = logFileReader.next();
+ if (logBlock instanceof HoodieDataBlock) {
+ try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+ recordItr.forEachRemaining(indexRecord -> {
+ final GenericRecord record = (GenericRecord) indexRecord;
+ final GenericRecord colStatsRecord = (GenericRecord) record.get(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS);
+ assertNotNull(colStatsRecord);
+ assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME));
+ assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT));
+ /**
+ * TODO: some types of field may have null min/max as these statistics are only supported for primitive types
+ * assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE));
+ * assertNotNull(colStatsRecord.get(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE));
+ */
+ });
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* Returns the list of all files in the dataset by iterating over the metadata table.
*
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 70f54b1..323724a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -288,19 +288,19 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase {
}
Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
- HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema);
-
- while (logFileReader.hasNext()) {
- HoodieLogBlock logBlock = logFileReader.next();
- if (logBlock instanceof HoodieDataBlock) {
- try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
- recordItr.forEachRemaining(indexRecord -> {
- final GenericRecord record = (GenericRecord) indexRecord;
- assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
- assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
- final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
- assertFalse(key.isEmpty());
- });
+ try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema)) {
+ while (logFileReader.hasNext()) {
+ HoodieLogBlock logBlock = logFileReader.next();
+ if (logBlock instanceof HoodieDataBlock) {
+ try (ClosableIterator<IndexedRecord> recordItr = ((HoodieDataBlock) logBlock).getRecordItr()) {
+ recordItr.forEachRemaining(indexRecord -> {
+ final GenericRecord record = (GenericRecord) indexRecord;
+ assertNull(record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+ final String key = String.valueOf(record.get(HoodieMetadataPayload.KEY_FIELD_NAME));
+ assertFalse(key.isEmpty());
+ });
+ }
}
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index dd8a83b..7e774c3 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -1509,6 +1509,7 @@ public class TestCleaner extends HoodieClientTestBase {
protected static HoodieCommitMetadata generateCommitMetadata(
String instantTime, Map<String, List<String>> partitionToFilePaths) {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA);
partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partitionPath);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 310ff4f..7f1046b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -119,7 +119,7 @@ public class TestInlineCompaction extends CompactionTestBase {
@Test
public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
// Given: make three commits
- HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, CompactionTriggerStrategy.NUM_OR_TIME);
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, CompactionTriggerStrategy.NUM_OR_TIME);
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
@@ -134,7 +134,7 @@ public class TestInlineCompaction extends CompactionTestBase {
assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants());
// 4th commit, that will trigger compaction because reach the time elapsed
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
- finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
+ finalInstant = HoodieActiveTimeline.createNewInstantTime(60000);
createNextDeltaCommit(finalInstant, dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index 60367f8..90d0f88 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -111,7 +111,10 @@ public class TestCleanPlanExecutor extends TestCleaner {
boolean simulateFailureRetry, boolean simulateMetadataFailure,
boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+ .withMetadataConfig(
+ HoodieMetadataConfig.newBuilder()
+ .withAssumeDatePartitioning(true)
+ .build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
@@ -384,7 +387,13 @@ public class TestCleanPlanExecutor extends TestCleaner {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+ .withMetadataConfig(
+ HoodieMetadataConfig.newBuilder()
+ .withAssumeDatePartitioning(true)
+ // Column Stats Index is disabled, since these tests construct tables which are
+ // not valid (empty commit metadata, invalid parquet files)
+ .withMetadataIndexColumnStats(false)
+ .build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
@@ -422,7 +431,13 @@ public class TestCleanPlanExecutor extends TestCleaner {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+ .withMetadataConfig(
+ HoodieMetadataConfig.newBuilder()
+ .withAssumeDatePartitioning(true)
+ // Column Stats Index is disabled, since these tests construct tables which are
+ // not valid (empty commit metadata, invalid parquet files)
+ .withMetadataIndexColumnStats(false)
+ .build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
.build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index ab3d504..1b41769 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -17,6 +17,13 @@
package org.apache.hudi.testutils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -25,6 +32,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieCleanStat;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -59,19 +67,12 @@ import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.timeline.service.TimelineService;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -82,6 +83,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
+import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
@@ -91,14 +93,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.function.Function;
import java.util.stream.Collectors;
-import scala.Tuple2;
-
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -571,7 +573,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
}
});
if (doFullValidation) {
- runFullValidation(writeConfig, metadataTableBasePath, engineContext);
+ runFullValidation(table.getConfig().getMetadataConfig(), writeConfig, metadataTableBasePath, engineContext);
}
LOG.info("Validation time=" + timer.endTimer());
@@ -644,7 +646,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
assertEquals(metadataFilenames.size(), numFiles);
}
- private void runFullValidation(HoodieWriteConfig writeConfig, String metadataTableBasePath, HoodieSparkEngineContext engineContext) {
+ private void runFullValidation(HoodieMetadataConfig metadataConfig,
+ HoodieWriteConfig writeConfig,
+ String metadataTableBasePath,
+ HoodieSparkEngineContext engineContext) {
HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(writeConfig);
assertNotNull(metadataWriter, "MetadataWriter should have been initialized");
@@ -666,16 +671,25 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
// in the .hoodie folder.
List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(engineContext, HoodieTableMetadata.getMetadataTableBasePath(basePath),
false, false);
- Assertions.assertEquals(metadataWriter.getEnabledPartitionTypes().size(), metadataTablePartitions.size());
+
+ List<MetadataPartitionType> enabledPartitionTypes = metadataWriter.getEnabledPartitionTypes();
+
+ Assertions.assertEquals(enabledPartitionTypes.size(), metadataTablePartitions.size());
+
+ Map<String, MetadataPartitionType> partitionTypeMap = enabledPartitionTypes.stream()
+ .collect(Collectors.toMap(MetadataPartitionType::getPartitionPath, Function.identity()));
// Metadata table should automatically compact and clean
// versions are +1 as autoClean / compaction happens end of commits
int numFileVersions = metadataWriteConfig.getCleanerFileVersionsRetained() + 1;
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metadataMetaClient, metadataMetaClient.getActiveTimeline());
metadataTablePartitions.forEach(partition -> {
+ MetadataPartitionType partitionType = partitionTypeMap.get(partition);
+
List<FileSlice> latestSlices = fsView.getLatestFileSlices(partition).collect(Collectors.toList());
- assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).count() <= 1, "Should have a single latest base file");
- assertTrue(latestSlices.size() <= 1, "Should have a single latest file slice");
+
+ assertTrue(latestSlices.stream().map(FileSlice::getBaseFile).filter(Objects::nonNull).count() <= partitionType.getFileGroupCount(), "Should have a single latest base file");
+ assertTrue(latestSlices.size() <= partitionType.getFileGroupCount(), "Should have a single latest file slice");
assertTrue(latestSlices.size() <= numFileVersions, "Should limit file slice to "
+ numFileVersions + " but was " + latestSlices.size());
});
diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index ece8d24..0fc91b2 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -265,5 +265,11 @@
<version>1.8.0</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/hudi-common/src/main/avro/HoodieMetadata.avsc b/hudi-common/src/main/avro/HoodieMetadata.avsc
index 4b458bd..a8d7ca7 100644
--- a/hudi-common/src/main/avro/HoodieMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieMetadata.avsc
@@ -122,17 +122,195 @@
"doc": "Minimum value in the range. Based on user data table schema, we can convert this to appropriate type",
"name": "minValue",
"type": [
+ // Those types should be aligned with Parquet `Statistics` impl
+ // making sure that we implement semantic consistent across file formats
+ //
+ // NOTE: Other logical types (decimal, date, timestamp, etc) will be converted
+ // into one of the following types, making sure that their corresponding
+ // ordering is preserved
"null",
- "string"
- ]
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "BooleanWrapper",
+ "doc": "A record wrapping boolean type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": "boolean",
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "IntWrapper",
+ "doc": "A record wrapping int type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": "int",
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "LongWrapper",
+ "doc": "A record wrapping long type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": "long",
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "FloatWrapper",
+ "doc": "A record wrapping float type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": "float",
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "DoubleWrapper",
+ "doc": "A record wrapping double type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": "double",
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "BytesWrapper",
+ "doc": "A record wrapping bytes type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": "bytes",
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "StringWrapper",
+ "doc": "A record wrapping string type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": "string",
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "DateWrapper",
+ "doc": "A record wrapping Date logical type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": {
+ "type": "int"
+ // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+ // rely on logical types to do proper encoding of the native Java types,
+ // and hereby have to encode statistic manually
+ //"logicalType": "date"
+ },
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "DecimalWrapper",
+ "doc": "A record wrapping Decimal logical type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": {
+ "type": "bytes",
+ "logicalType": "decimal",
+ // NOTE: This is equivalent to Spark's [[DoubleDecimal]] and should
+ // be enough for almost any possible use-cases
+ "precision": 30,
+ "scale": 15
+ },
+ "name": "value"
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "TimeMicrosWrapper",
+ "doc": "A record wrapping Time-micros logical type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": {
+ "type": "long",
+ "logicalType": "time-micros"
+ },
+ "name": "value"
+
+ }
+ ]
+ },
+ {
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "TimestampMicrosWrapper",
+ "doc": "A record wrapping Timestamp-micros logical type to be able to be used it w/in Avro's Union",
+ "fields": [
+ {
+ "type": {
+ "type": "long"
+ // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+ // rely on logical types to do proper encoding of the native Java types,
+ // and hereby have to encode statistic manually
+ //"logicalType": "timestamp-micros"
+ },
+ "name": "value"
+ }
+ ]
+ }
+ ],
+ "default": null
},
{
"doc": "Maximum value in the range. Based on user data table schema, we can convert it to appropriate type",
"name": "maxValue",
"type": [
+ // Those types should be aligned with Parquet `Statistics` impl
+ // making sure that we implement semantic consistent across file formats
+ //
+ // NOTE: Other logical types (decimal, date, timestamp, etc) will be converted
+ // into one of the following types, making sure that their corresponding
+ // ordering is preserved
"null",
- "string"
- ]
+ "org.apache.hudi.avro.model.BooleanWrapper",
+ "org.apache.hudi.avro.model.IntWrapper",
+ "org.apache.hudi.avro.model.LongWrapper",
+ "org.apache.hudi.avro.model.FloatWrapper",
+ "org.apache.hudi.avro.model.DoubleWrapper",
+ "org.apache.hudi.avro.model.BytesWrapper",
+ "org.apache.hudi.avro.model.StringWrapper",
+ "org.apache.hudi.avro.model.DateWrapper",
+ "org.apache.hudi.avro.model.DecimalWrapper",
+ "org.apache.hudi.avro.model.TimeMicrosWrapper",
+ "org.apache.hudi.avro.model.TimestampMicrosWrapper"
+ ],
+ "default": null
},
{
"doc": "Total count of values",
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java b/hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java
new file mode 100644
index 0000000..9d36e21
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.Schema;
+import org.apache.avro.UnresolvedUnionException;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+
+import java.util.Map;
+
+/**
+ * Custom instance of the {@link GenericData} model incorporating conversions from the
+ * common Avro logical types like "decimal", "uuid", "date", "time-micros", "timestamp-micros"
+ *
+ * NOTE: Given that this code has to be interoperable w/ Spark 2 (which relies on Avro 1.8.2)
+ * this model can't support newer conversion introduced in Avro 1.10 at the moment
+ */
+public class ConvertingGenericData extends GenericData {
+
+ private static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion();
+ private static final Conversions.UUIDConversion UUID_CONVERSION = new Conversions.UUIDConversion();
+ private static final TimeConversions.DateConversion DATE_CONVERSION = new TimeConversions.DateConversion();
+ private static final TimeConversions.TimeMicrosConversion TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
+ private static final TimeConversions.TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
+
+ // NOTE: Those are not supported in Avro 1.8.2
+ // TODO re-enable upon upgrading to 1.10
+ // private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
+ // private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
+ // private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion();
+ // private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion();
+
+ public static final GenericData INSTANCE = new ConvertingGenericData();
+
+ private ConvertingGenericData() {
+ addLogicalTypeConversion(DECIMAL_CONVERSION);
+ addLogicalTypeConversion(UUID_CONVERSION);
+ addLogicalTypeConversion(DATE_CONVERSION);
+ addLogicalTypeConversion(TIME_MICROS_CONVERSION);
+ addLogicalTypeConversion(TIMESTAMP_MICROS_CONVERSION);
+ // NOTE: Those are not supported in Avro 1.8.2
+ // TODO re-enable upon upgrading to 1.10
+ // addLogicalTypeConversion(TIME_MILLIS_CONVERSION);
+ // addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION);
+ // addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
+ // addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION);
+ }
+
+ @Override
+ public boolean validate(Schema schema, Object datum) {
+ switch (schema.getType()) {
+ case RECORD:
+ if (!isRecord(datum)) {
+ return false;
+ }
+ for (Schema.Field f : schema.getFields()) {
+ if (!validate(f.schema(), getField(datum, f.name(), f.pos()))) {
+ return false;
+ }
+ }
+ return true;
+ case ENUM:
+ if (!isEnum(datum)) {
+ return false;
+ }
+ return schema.getEnumSymbols().contains(datum.toString());
+ case ARRAY:
+ if (!(isArray(datum))) {
+ return false;
+ }
+ for (Object element : getArrayAsCollection(datum)) {
+ if (!validate(schema.getElementType(), element)) {
+ return false;
+ }
+ }
+ return true;
+ case MAP:
+ if (!(isMap(datum))) {
+ return false;
+ }
+ @SuppressWarnings(value = "unchecked")
+ Map<Object, Object> map = (Map<Object, Object>) datum;
+ for (Map.Entry<Object, Object> entry : map.entrySet()) {
+ if (!validate(schema.getValueType(), entry.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ case UNION:
+ try {
+ int i = resolveUnion(schema, datum);
+ return validate(schema.getTypes().get(i), datum);
+ } catch (UnresolvedUnionException e) {
+ return false;
+ }
+ case FIXED:
+ return (datum instanceof GenericFixed && ((GenericFixed) datum).bytes().length == schema.getFixedSize())
+ || DECIMAL_CONVERSION.getConvertedType().isInstance(datum);
+ case STRING:
+ return isString(datum)
+ || UUID_CONVERSION.getConvertedType().isInstance(datum);
+ case BYTES:
+ return isBytes(datum)
+ || DECIMAL_CONVERSION.getConvertedType().isInstance(datum);
+ case INT:
+ return isInteger(datum)
+ || DATE_CONVERSION.getConvertedType().isInstance(datum);
+ case LONG:
+ return isLong(datum)
+ || TIME_MICROS_CONVERSION.getConvertedType().isInstance(datum)
+ || TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum);
+ case FLOAT:
+ return isFloat(datum);
+ case DOUBLE:
+ return isDouble(datum);
+ case BOOLEAN:
+ return isBoolean(datum);
+ case NULL:
+ return datum == null;
+ default:
+ return false;
+ }
+ }
+}
+
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 90344ce..237851c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -69,6 +69,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.HashMap;
import java.util.TimeZone;
import java.util.Iterator;
@@ -82,9 +83,8 @@ import static org.apache.avro.Schema.Type.UNION;
*/
public class HoodieAvroUtils {
- private static ThreadLocal<BinaryEncoder> reuseEncoder = ThreadLocal.withInitial(() -> null);
-
- private static ThreadLocal<BinaryDecoder> reuseDecoder = ThreadLocal.withInitial(() -> null);
+ private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER = ThreadLocal.withInitial(() -> null);
+ private static final ThreadLocal<BinaryDecoder> BINARY_DECODER = ThreadLocal.withInitial(() -> null);
private static final long MILLIS_PER_DAY = 86400000L;
@@ -92,9 +92,9 @@ public class HoodieAvroUtils {
public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion();
// As per https://avro.apache.org/docs/current/spec.html#names
- private static String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
- private static String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
- private static String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
+ private static final String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
+ private static final String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
+ private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
// All metadata fields are optional strings.
public static final Schema METADATA_FIELD_SCHEMA =
@@ -110,10 +110,10 @@ public class HoodieAvroUtils {
}
public static <T extends IndexedRecord> byte[] indexedRecordToBytes(T record) {
- GenericDatumWriter<T> writer = new GenericDatumWriter<>(record.getSchema());
+ GenericDatumWriter<T> writer = new GenericDatumWriter<>(record.getSchema(), ConvertingGenericData.INSTANCE);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
- BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, reuseEncoder.get());
- reuseEncoder.set(encoder);
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, BINARY_ENCODER.get());
+ BINARY_ENCODER.set(encoder);
writer.write(record, encoder);
encoder.flush();
return out.toByteArray();
@@ -148,8 +148,8 @@ public class HoodieAvroUtils {
* Convert serialized bytes back into avro record.
*/
public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get());
- reuseDecoder.set(decoder);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, BINARY_DECODER.get());
+ BINARY_DECODER.set(decoder);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(writerSchema, readerSchema);
return reader.read(null, decoder);
}
@@ -391,7 +391,7 @@ public class HoodieAvroUtils {
}
}
- if (!GenericData.get().validate(newSchema, newRecord)) {
+ if (!ConvertingGenericData.INSTANCE.validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
}
@@ -429,9 +429,13 @@ public class HoodieAvroUtils {
if (fieldValue != null) {
// In case field's value is a nested record, we have to rewrite it as well
- Object newFieldValue = fieldValue instanceof GenericRecord
- ? rewriteRecord((GenericRecord) fieldValue, resolveNullableSchema(field.schema()))
- : fieldValue;
+ Object newFieldValue;
+ if (fieldValue instanceof GenericRecord) {
+ GenericRecord record = (GenericRecord) fieldValue;
+ newFieldValue = rewriteRecord(record, resolveUnionSchema(field.schema(), record.getSchema().getFullName()));
+ } else {
+ newFieldValue = fieldValue;
+ }
newRecord.put(field.name(), newFieldValue);
} else if (field.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(field.name(), null);
@@ -519,6 +523,56 @@ public class HoodieAvroUtils {
}
/**
+ * Get schema for the given field and record. Field can be nested, denoted by dot notation. e.g: a.b.c
+ *
+ * @param record - record containing the value of the given field
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record, String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ GenericRecord valueNode = record;
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Object val = valueNode.get(part);
+
+ if (i == parts.length - 1) {
+ return resolveNullableSchema(valueNode.getSchema().getField(part).schema());
+ } else {
+ if (!(val instanceof GenericRecord)) {
+ throw new HoodieException("Cannot find a record at part value :" + part);
+ }
+ valueNode = (GenericRecord) val;
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName);
+ }
+
+
+ /**
+ * Get schema for the given field and write schema. Field can be nested, denoted by dot notation. e.g: a.b.c
+ * Use this method when record is not available. Otherwise, prefer to use {@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
+ *
+ * @param writeSchema - write schema of the record
+ * @param fieldName - name of the field
+ * @return
+ */
+ public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema, String fieldName) {
+ String[] parts = fieldName.split("\\.");
+ int i = 0;
+ for (; i < parts.length; i++) {
+ String part = parts[i];
+ Schema schema = writeSchema.getField(part).schema();
+
+ if (i == parts.length - 1) {
+ return resolveNullableSchema(schema);
+ }
+ }
+ throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName);
+ }
+
+ /**
* Returns the string value of the given record {@code rec} and field {@code fieldName}.
* The field and value both could be missing.
*
@@ -653,7 +707,27 @@ public class HoodieAvroUtils {
return getRecordColumnValues(record, columns, schema.get(), consistentLogicalTimestampEnabled);
}
- private static Schema resolveNullableSchema(Schema schema) {
+ private static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullName) {
+ if (schema.getType() != Schema.Type.UNION) {
+ return schema;
+ }
+
+ List<Schema> innerTypes = schema.getTypes();
+ Schema nonNullType =
+ innerTypes.stream()
+ .filter(it -> it.getType() != Schema.Type.NULL && Objects.equals(it.getFullName(), fieldSchemaFullName))
+ .findFirst()
+ .orElse(null);
+
+ if (nonNullType == null) {
+ throw new AvroRuntimeException(
+ String.format("Unsupported Avro UNION type %s: Only UNION of a null type and a non-null type is supported", schema));
+ }
+
+ return nonNullType;
+ }
+
+ public static Schema resolveNullableSchema(Schema schema) {
if (schema.getType() != Schema.Type.UNION) {
return schema;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
index 18827c6..020fcc2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWriteSupport.java
@@ -44,7 +44,7 @@ public class HoodieAvroWriteSupport extends AvroWriteSupport {
public static final String HOODIE_BLOOM_FILTER_TYPE_CODE = "hoodie_bloom_filter_type_code";
public HoodieAvroWriteSupport(MessageType schema, Schema avroSchema, Option<BloomFilter> bloomFilterOpt) {
- super(schema, avroSchema);
+ super(schema, avroSchema, ConvertingGenericData.INSTANCE);
this.bloomFilterOpt = bloomFilterOpt;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 7c9b7cc..86bb320 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -585,12 +586,21 @@ public class FSUtils {
}
public static Path getPartitionPath(String basePath, String partitionPath) {
- return getPartitionPath(new Path(basePath), partitionPath);
+ if (StringUtils.isNullOrEmpty(partitionPath)) {
+ return new Path(basePath);
+ }
+
+ // NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like
+ // absolute path
+ String properPartitionPath = partitionPath.startsWith("/")
+ ? partitionPath.substring(1)
+ : partitionPath;
+ return getPartitionPath(new Path(basePath), properPartitionPath);
}
public static Path getPartitionPath(Path basePath, String partitionPath) {
// FOr non-partitioned table, return only base-path
- return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath : new Path(basePath, partitionPath);
+ return StringUtils.isNullOrEmpty(partitionPath) ? basePath : new Path(basePath, partitionPath);
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index 2afbd19..e3c5a70 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -20,10 +20,7 @@ package org.apache.hudi.common.model;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.Comparator;
import java.util.Objects;
-import java.util.function.BiFunction;
-import java.util.stream.Stream;
/**
* Hoodie metadata for the column range of data stored in columnar format (like Parquet)
@@ -45,23 +42,6 @@ public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializ
private final long totalSize;
private final long totalUncompressedSize;
- public static final BiFunction<HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>, HoodieColumnRangeMetadata<Comparable>> COLUMN_RANGE_MERGE_FUNCTION =
- (oldColumnRange, newColumnRange) -> new HoodieColumnRangeMetadata<Comparable>(
- newColumnRange.getFilePath(),
- newColumnRange.getColumnName(),
- (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
- .filter(Objects::nonNull)
- .min(Comparator.naturalOrder())
- .orElse(null),
- (Comparable) Stream.of(oldColumnRange.getMinValue(), newColumnRange.getMinValue())
- .filter(Objects::nonNull)
- .max(Comparator.naturalOrder()).orElse(null),
- oldColumnRange.getNullCount() + newColumnRange.getNullCount(),
- oldColumnRange.getValueCount() + newColumnRange.getValueCount(),
- oldColumnRange.getTotalSize() + newColumnRange.getTotalSize(),
- oldColumnRange.getTotalUncompressedSize() + newColumnRange.getTotalUncompressedSize()
- );
-
private HoodieColumnRangeMetadata(String filePath,
String columnName,
@Nullable T minValue,
@@ -168,18 +148,4 @@ public class HoodieColumnRangeMetadata<T extends Comparable> implements Serializ
String columnName) {
return new HoodieColumnRangeMetadata<>(filePath, columnName, null, null, -1, -1, -1, -1);
}
-
- /**
- * Statistics that is collected in {@link org.apache.hudi.metadata.MetadataPartitionType#COLUMN_STATS} index.
- */
- public static final class Stats {
- public static final String VALUE_COUNT = "value_count";
- public static final String NULL_COUNT = "null_count";
- public static final String MIN = "min";
- public static final String MAX = "max";
- public static final String TOTAL_SIZE = "total_size";
- public static final String TOTAL_UNCOMPRESSED_SIZE = "total_uncompressed_size";
-
- private Stats() {}
- }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 09996db..53ceb00 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -130,8 +130,9 @@ public class HoodieCommitMetadata implements Serializable {
public HashMap<String, String> getFileIdAndFullPaths(String basePath) {
HashMap<String, String> fullPaths = new HashMap<>();
for (Map.Entry<String, String> entry : getFileIdAndRelativePaths().entrySet()) {
- String fullPath =
- (entry.getValue() != null) ? (FSUtils.getPartitionPath(basePath, entry.getValue())).toString() : null;
+ String fullPath = entry.getValue() != null
+ ? FSUtils.getPartitionPath(basePath, entry.getValue()).toString()
+ : null;
fullPaths.put(entry.getKey(), fullPath);
}
return fullPaths;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
index cf3bb52..9626e21 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieDeltaWriteStat.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.hudi.common.util.Option;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -30,13 +29,14 @@ import java.util.Map;
* Statistics about a single Hoodie delta log operation.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
+@SuppressWarnings("rawtypes")
public class HoodieDeltaWriteStat extends HoodieWriteStat {
private int logVersion;
private long logOffset;
private String baseFile;
private List<String> logFiles = new ArrayList<>();
- private Option<RecordsStats<? extends Map>> recordsStats = Option.empty();
+ private Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> recordsStats = Option.empty();
public void setLogVersion(int logVersion) {
this.logVersion = logVersion;
@@ -74,23 +74,11 @@ public class HoodieDeltaWriteStat extends HoodieWriteStat {
return logFiles;
}
- public void setRecordsStats(RecordsStats<? extends Map> stats) {
+ public void setRecordsStats(Map<String, HoodieColumnRangeMetadata<Comparable>> stats) {
recordsStats = Option.of(stats);
}
- public Option<RecordsStats<? extends Map>> getRecordsStats() {
+ public Option<Map<String, HoodieColumnRangeMetadata<Comparable>>> getColumnStats() {
return recordsStats;
}
-
- public static class RecordsStats<T> implements Serializable {
- private final T recordsStats;
-
- public RecordsStats(T recordsStats) {
- this.recordsStats = recordsStats;
- }
-
- public T getStats() {
- return recordsStats;
- }
- }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
index 531a090..cf90eff 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java
@@ -40,6 +40,35 @@ public class DateTimeUtils {
Collections.unmodifiableMap(initMap());
/**
+ * Converts provided microseconds (from epoch) to {@link Instant}
+ */
+ public static Instant microsToInstant(long microsFromEpoch) {
+ long epochSeconds = microsFromEpoch / (1_000_000L);
+ long nanoAdjustment = (microsFromEpoch % (1_000_000L)) * 1_000L;
+
+ return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
+ }
+
+ /**
+ * Converts provided {@link Instant} to microseconds (from epoch)
+ */
+ public static long instantToMicros(Instant instant) {
+ long seconds = instant.getEpochSecond();
+ int nanos = instant.getNano();
+
+ if (seconds < 0 && nanos > 0) {
+ long micros = Math.multiplyExact(seconds + 1, 1_000_000L);
+ long adjustment = (nanos / 1_000L) - 1_000_000;
+
+ return Math.addExact(micros, adjustment);
+ } else {
+ long micros = Math.multiplyExact(seconds, 1_000_000L);
+
+ return Math.addExact(micros, nanos / 1_000L);
+ }
+ }
+
+ /**
* Parse input String to a {@link java.time.Instant}.
*
* @param s Input String should be Epoch time in millisecond or ISO-8601 format.
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 0f45997..01618c5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -18,10 +18,30 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.BooleanWrapper;
+import org.apache.hudi.avro.model.BytesWrapper;
+import org.apache.hudi.avro.model.DateWrapper;
+import org.apache.hudi.avro.model.DecimalWrapper;
+import org.apache.hudi.avro.model.DoubleWrapper;
+import org.apache.hudi.avro.model.FloatWrapper;
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.avro.model.IntWrapper;
+import org.apache.hudi.avro.model.LongWrapper;
+import org.apache.hudi.avro.model.StringWrapper;
+import org.apache.hudi.avro.model.TimestampMicrosWrapper;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -35,31 +55,33 @@ import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieHFileReader;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import java.io.IOException;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
+import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartition;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
/**
* MetadataTable records are persisted with the schema defined in HoodieMetadata.avsc.
@@ -119,6 +141,8 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
public static final String COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE = "totalUncompressedSize";
public static final String COLUMN_STATS_FIELD_IS_DELETED = FIELD_IS_DELETED;
+ private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION = new Conversions.DecimalConversion();
+
private String key = null;
private int type = 0;
private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
@@ -180,8 +204,8 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
columnStatMetadata = HoodieMetadataColumnStats.newBuilder()
.setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
.setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
- .setMinValue((String) columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))
- .setMaxValue((String) columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))
+ .setMinValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))
+ .setMaxValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))
.setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
.setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
.setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
@@ -351,7 +375,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
HoodieMetadataColumnStats previousColStatsRecord = previousRecord.getColumnStatMetadata().get();
HoodieMetadataColumnStats newColumnStatsRecord = getColumnStatMetadata().get();
- return HoodieTableMetadataUtil.mergeColumnStats(previousColStatsRecord, newColumnStatsRecord);
+ return mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord);
}
@Override
@@ -531,29 +555,69 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
return getColumnStatsIndexKey(partitionIndexID, fileIndexID, columnIndexID);
}
- public static Stream<HoodieRecord> createColumnStatsRecords(
- String partitionName, Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList, boolean isDeleted) {
+ public static Stream<HoodieRecord> createColumnStatsRecords(String partitionName,
+ Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
+ boolean isDeleted) {
return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
HoodieKey key = new HoodieKey(getColumnStatsIndexKey(partitionName, columnRangeMetadata),
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
+
HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(),
HoodieMetadataColumnStats.newBuilder()
.setFileName(new Path(columnRangeMetadata.getFilePath()).getName())
.setColumnName(columnRangeMetadata.getColumnName())
- .setMinValue(columnRangeMetadata.getMinValue() == null ? null :
- columnRangeMetadata.getMinValue().toString())
- .setMaxValue(columnRangeMetadata.getMaxValue() == null ? null :
- columnRangeMetadata.getMaxValue().toString())
+ .setMinValue(wrapStatisticValue(columnRangeMetadata.getMinValue()))
+ .setMaxValue(wrapStatisticValue(columnRangeMetadata.getMaxValue()))
.setNullCount(columnRangeMetadata.getNullCount())
.setValueCount(columnRangeMetadata.getValueCount())
.setTotalSize(columnRangeMetadata.getTotalSize())
.setTotalUncompressedSize(columnRangeMetadata.getTotalUncompressedSize())
.setIsDeleted(isDeleted)
.build());
+
return new HoodieAvroRecord<>(key, payload);
});
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
+ HoodieMetadataColumnStats newColumnStats) {
+ checkArgument(Objects.equals(prevColumnStats.getFileName(), newColumnStats.getFileName()));
+ checkArgument(Objects.equals(prevColumnStats.getColumnName(), newColumnStats.getColumnName()));
+
+ if (newColumnStats.getIsDeleted()) {
+ return newColumnStats;
+ }
+
+ Comparable minValue =
+ (Comparable) Stream.of(
+ (Comparable) unwrapStatisticValueWrapper(prevColumnStats.getMinValue()),
+ (Comparable) unwrapStatisticValueWrapper(newColumnStats.getMinValue()))
+ .filter(Objects::nonNull)
+ .min(Comparator.naturalOrder())
+ .orElse(null);
+
+ Comparable maxValue =
+ (Comparable) Stream.of(
+ (Comparable) unwrapStatisticValueWrapper(prevColumnStats.getMinValue()),
+ (Comparable) unwrapStatisticValueWrapper(newColumnStats.getMinValue()))
+ .filter(Objects::nonNull)
+ .max(Comparator.naturalOrder())
+ .orElse(null);
+
+ return HoodieMetadataColumnStats.newBuilder()
+ .setFileName(newColumnStats.getFileName())
+ .setColumnName(newColumnStats.getColumnName())
+ .setMinValue(wrapStatisticValue(minValue))
+ .setMaxValue(wrapStatisticValue(maxValue))
+ .setValueCount(prevColumnStats.getValueCount() + newColumnStats.getValueCount())
+ .setNullCount(prevColumnStats.getNullCount() + newColumnStats.getNullCount())
+ .setTotalSize(prevColumnStats.getTotalSize() + newColumnStats.getTotalSize())
+ .setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize())
+ .setIsDeleted(newColumnStats.getIsDeleted())
+ .build();
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {");
@@ -579,6 +643,85 @@ public class HoodieMetadataPayload implements HoodieRecordPayload<HoodieMetadata
return sb.toString();
}
+ private static Object wrapStatisticValue(Comparable<?> statValue) {
+ if (statValue == null) {
+ return null;
+ } else if (statValue instanceof Date || statValue instanceof LocalDate) {
+ // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+ // rely on logical types to do proper encoding of the native Java types,
+ // and hereby have to encode statistic manually
+ LocalDate localDate = statValue instanceof LocalDate
+ ? (LocalDate) statValue
+ : ((Date) statValue).toLocalDate();
+ return DateWrapper.newBuilder().setValue((int) localDate.toEpochDay()).build();
+ } else if (statValue instanceof BigDecimal) {
+ Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
+ BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) statValue, (LogicalTypes.Decimal) valueSchema.getLogicalType());
+ return DecimalWrapper.newBuilder()
+ .setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal, valueSchema, valueSchema.getLogicalType()))
+ .build();
+ } else if (statValue instanceof Timestamp) {
+ // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we can't
+ // rely on logical types to do proper encoding of the native Java types,
+ // and hereby have to encode statistic manually
+ Instant instant = ((Timestamp) statValue).toInstant();
+ return TimestampMicrosWrapper.newBuilder()
+ .setValue(instantToMicros(instant))
+ .build();
+ } else if (statValue instanceof Boolean) {
+ return BooleanWrapper.newBuilder().setValue((Boolean) statValue).build();
+ } else if (statValue instanceof Integer) {
+ return IntWrapper.newBuilder().setValue((Integer) statValue).build();
+ } else if (statValue instanceof Long) {
+ return LongWrapper.newBuilder().setValue((Long) statValue).build();
+ } else if (statValue instanceof Float) {
+ return FloatWrapper.newBuilder().setValue((Float) statValue).build();
+ } else if (statValue instanceof Double) {
+ return DoubleWrapper.newBuilder().setValue((Double) statValue).build();
+ } else if (statValue instanceof ByteBuffer) {
+ return BytesWrapper.newBuilder().setValue((ByteBuffer) statValue).build();
+ } else if (statValue instanceof String || statValue instanceof Utf8) {
+ return StringWrapper.newBuilder().setValue(statValue.toString()).build();
+ } else {
+ throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValue.getClass()));
+ }
+ }
+
+ public static Comparable<?> unwrapStatisticValueWrapper(Object statValueWrapper) {
+ if (statValueWrapper == null) {
+ return null;
+ } else if (statValueWrapper instanceof DateWrapper) {
+ return LocalDate.ofEpochDay(((DateWrapper) statValueWrapper).getValue());
+ } else if (statValueWrapper instanceof DecimalWrapper) {
+ Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
+ return AVRO_DECIMAL_CONVERSION.fromBytes(((DecimalWrapper) statValueWrapper).getValue(), valueSchema, valueSchema.getLogicalType());
+ } else if (statValueWrapper instanceof TimestampMicrosWrapper) {
+ return microsToInstant(((TimestampMicrosWrapper) statValueWrapper).getValue());
+ } else if (statValueWrapper instanceof BooleanWrapper) {
+ return ((BooleanWrapper) statValueWrapper).getValue();
+ } else if (statValueWrapper instanceof IntWrapper) {
+ return ((IntWrapper) statValueWrapper).getValue();
+ } else if (statValueWrapper instanceof LongWrapper) {
+ return ((LongWrapper) statValueWrapper).getValue();
+ } else if (statValueWrapper instanceof FloatWrapper) {
+ return ((FloatWrapper) statValueWrapper).getValue();
+ } else if (statValueWrapper instanceof DoubleWrapper) {
+ return ((DoubleWrapper) statValueWrapper).getValue();
+ } else if (statValueWrapper instanceof BytesWrapper) {
+ return ((BytesWrapper) statValueWrapper).getValue();
+ } else if (statValueWrapper instanceof StringWrapper) {
+ return ((StringWrapper) statValueWrapper).getValue();
+ } else if (statValueWrapper instanceof GenericRecord) {
+ // NOTE: This branch could be hit b/c Avro records could be reconstructed
+ // as {@code GenericRecord)
+ // TODO add logical type decoding
+ GenericRecord record = (GenericRecord) statValueWrapper;
+ return (Comparable<?>) record.get("value");
+ } else {
+ throw new UnsupportedOperationException(String.format("Unsupported type of the statistic (%s)", statValueWrapper.getClass()));
+ }
+ }
+
private static void validatePayload(int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) {
if (type == METADATA_TYPE_FILE_LIST) {
filesystemMetadata.forEach((fileName, fileInfo) -> {
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 9e3eca3..63271cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,6 +18,14 @@
package org.apache.hudi.metadata;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.ConvertingGenericData;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
@@ -55,18 +63,13 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nonnull;
-
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -77,23 +80,19 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldValAsString;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.COLUMN_RANGE_MERGE_FUNCTION;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MAX;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.MIN;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.NULL_COUNT;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_SIZE;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.TOTAL_UNCOMPRESSED_SIZE;
-import static org.apache.hudi.common.model.HoodieColumnRangeMetadata.Stats.VALUE_COUNT;
+import static org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.resolveNullableSchema;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
@@ -109,6 +108,100 @@ public class HoodieTableMetadataUtil {
protected static final String PARTITION_NAME_BLOOM_FILTERS = "bloom_filters";
/**
+ * Collects {@link HoodieColumnRangeMetadata} for the provided collection of records, pretending
+ * as if provided records have been persisted w/in given {@code filePath}
+ *
+ * @param records target records to compute column range metadata for
+ * @param targetFields columns (fields) to be collected
+ * @param filePath file path value required for {@link HoodieColumnRangeMetadata}
+ *
+ * @return map of {@link HoodieColumnRangeMetadata} for each of the provided target fields for
+ * the collection of provided records
+ */
+ public static Map<String, HoodieColumnRangeMetadata<Comparable>> collectColumnRangeMetadata(List<IndexedRecord> records,
+ List<Schema.Field> targetFields,
+ String filePath) {
+ // Helper class to calculate column stats
+ class ColumnStats {
+ Object minValue;
+ Object maxValue;
+ long nullCount;
+ long valueCount;
+ }
+
+ HashMap<String, ColumnStats> allColumnStats = new HashMap<>();
+
+ // Collect stats for all columns by iterating through records while accounting
+ // corresponding stats
+ records.forEach((record) -> {
+ // For each column (field) we have to index update corresponding column stats
+ // with the values from this record
+ targetFields.forEach(field -> {
+ ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), (ignored) -> new ColumnStats());
+
+ GenericRecord genericRecord = (GenericRecord) record;
+
+ final Object fieldVal = convertValueForSpecificDataTypes(field.schema(), genericRecord.get(field.name()), true);
+ final Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(genericRecord.getSchema(), field.name());
+
+ if (fieldVal != null && canCompare(fieldSchema)) {
+ // Set the min value of the field
+ if (colStats.minValue == null
+ || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.minValue, fieldSchema) < 0) {
+ colStats.minValue = fieldVal;
+ }
+
+ // Set the max value of the field
+ if (colStats.maxValue == null || ConvertingGenericData.INSTANCE.compare(fieldVal, colStats.maxValue, fieldSchema) > 0) {
+ colStats.maxValue = fieldVal;
+ }
+
+ colStats.valueCount++;
+ } else {
+ colStats.nullCount++;
+ }
+ });
+ });
+
+ Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, HoodieColumnRangeMetadata<Comparable>>> collector =
+ Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());
+
+ return (Map<String, HoodieColumnRangeMetadata<Comparable>>) targetFields.stream()
+ .map(field -> {
+ ColumnStats colStats = allColumnStats.get(field.name());
+ return HoodieColumnRangeMetadata.<Comparable>create(
+ filePath,
+ field.name(),
+ colStats == null ? null : coerceToComparable(field.schema(), colStats.minValue),
+ colStats == null ? null : coerceToComparable(field.schema(), colStats.maxValue),
+ colStats == null ? 0 : colStats.nullCount,
+ colStats == null ? 0 : colStats.valueCount,
+ // NOTE: Size and compressed size statistics are set to 0 to make sure we're not
+ // mixing up those provided by Parquet with the ones from other encodings,
+ // since those are not directly comparable
+ 0,
+ 0
+ );
+ })
+ .collect(collector);
+ }
+
+ /**
+ * Converts instance of {@link HoodieMetadataColumnStats} to {@link HoodieColumnRangeMetadata}
+ */
+ public static HoodieColumnRangeMetadata<Comparable> convertColumnStatsRecordToColumnRangeMetadata(HoodieMetadataColumnStats columnStats) {
+ return HoodieColumnRangeMetadata.<Comparable>create(
+ columnStats.getFileName(),
+ columnStats.getColumnName(),
+ unwrapStatisticValueWrapper(columnStats.getMinValue()),
+ unwrapStatisticValueWrapper(columnStats.getMaxValue()),
+ columnStats.getNullCount(),
+ columnStats.getValueCount(),
+ columnStats.getTotalSize(),
+ columnStats.getTotalUncompressedSize());
+ }
+
+ /**
* Delete the metadata table for the dataset. This will be invoked during upgrade/downgrade operation during which
* no other
* process should be running.
@@ -457,8 +550,11 @@ public class HoodieTableMetadataUtil {
int parallelism = Math.max(Math.min(deleteFileList.size(), recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
return engineContext.parallelize(deleteFileList, parallelism)
.flatMap(deleteFileInfoPair -> {
- if (deleteFileInfoPair.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- return getColumnStats(deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), dataTableMetaClient, columnsToIndex, true).iterator();
+ String partitionPath = deleteFileInfoPair.getLeft();
+ String filePath = deleteFileInfoPair.getRight();
+
+ if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return getColumnStatsRecords(partitionPath, filePath, dataTableMetaClient, columnsToIndex, true).iterator();
}
return Collections.emptyListIterator();
});
@@ -531,7 +627,8 @@ public class HoodieTableMetadataUtil {
}
if (recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS)) {
- final HoodieData<HoodieRecord> metadataColumnStatsRDD = convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
+ final HoodieData<HoodieRecord> metadataColumnStatsRDD =
+ convertFilesToColumnStatsRecords(engineContext, partitionToDeletedFiles, partitionToAppendedFiles, recordsGenerationParams);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, metadataColumnStatsRDD);
}
@@ -815,7 +912,7 @@ public class HoodieTableMetadataUtil {
return deletedFileList.stream().flatMap(deletedFile -> {
final String filePathWithPartition = partitionName + "/" + deletedFile;
- return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true);
+ return getColumnStatsRecords(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, true);
}).iterator();
});
allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
@@ -836,7 +933,7 @@ public class HoodieTableMetadataUtil {
return Stream.empty();
}
final String filePathWithPartition = partitionName + "/" + appendedFileNameLengthEntry.getKey();
- return getColumnStats(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false);
+ return getColumnStatsRecords(partition, filePathWithPartition, dataTableMetaClient, columnsToIndex, false);
}).iterator();
});
@@ -1014,63 +1111,65 @@ public class HoodieTableMetadataUtil {
return Arrays.asList(tableConfig.getRecordKeyFields().get());
}
- public static HoodieMetadataColumnStats mergeColumnStats(HoodieMetadataColumnStats prevColumnStatsRecord,
- HoodieMetadataColumnStats newColumnStatsRecord) {
- checkArgument(prevColumnStatsRecord.getFileName().equals(newColumnStatsRecord.getFileName()));
- checkArgument(prevColumnStatsRecord.getColumnName().equals(newColumnStatsRecord.getColumnName()));
-
- if (newColumnStatsRecord.getIsDeleted()) {
- return newColumnStatsRecord;
+ private static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat,
+ HoodieTableMetaClient datasetMetaClient,
+ List<String> columnsToIndex) {
+ if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getColumnStats().isPresent()) {
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
+ Collection<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnRangeMap.values();
+ return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
}
- return HoodieMetadataColumnStats.newBuilder()
- .setFileName(newColumnStatsRecord.getFileName())
- .setColumnName(newColumnStatsRecord.getColumnName())
- .setMinValue(Stream.of(prevColumnStatsRecord.getMinValue(), newColumnStatsRecord.getMinValue()).filter(Objects::nonNull).min(Comparator.naturalOrder()).orElse(null))
- .setMaxValue(Stream.of(prevColumnStatsRecord.getMinValue(), newColumnStatsRecord.getMinValue()).filter(Objects::nonNull).max(Comparator.naturalOrder()).orElse(null))
- .setValueCount(prevColumnStatsRecord.getValueCount() + newColumnStatsRecord.getValueCount())
- .setNullCount(prevColumnStatsRecord.getNullCount() + newColumnStatsRecord.getNullCount())
- .setTotalSize(prevColumnStatsRecord.getTotalSize() + newColumnStatsRecord.getTotalSize())
- .setTotalUncompressedSize(prevColumnStatsRecord.getTotalUncompressedSize() + newColumnStatsRecord.getTotalUncompressedSize())
- .setIsDeleted(newColumnStatsRecord.getIsDeleted())
- .build();
+ return getColumnStatsRecords(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
}
- public static Stream<HoodieRecord> translateWriteStatToColumnStats(HoodieWriteStat writeStat,
- HoodieTableMetaClient datasetMetaClient,
- List<String> columnsToIndex) {
- if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) writeStat).getRecordsStats().isPresent()) {
- Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = ((HoodieDeltaWriteStat) writeStat).getRecordsStats().get().getStats();
- List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = new ArrayList<>(columnRangeMap.values());
- return HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(), columnRangeMetadataList, false);
+ private static Stream<HoodieRecord> getColumnStatsRecords(String partitionPath,
+ String filePath,
+ HoodieTableMetaClient datasetMetaClient,
+ List<String> columnsToIndex,
+ boolean isDeleted) {
+ String partitionName = getPartition(partitionPath);
+ // NOTE: We have to chop leading "/" to make sure Hadoop does not treat it like
+ // absolute path
+ String filePartitionPath = filePath.startsWith("/") ? filePath.substring(1) : filePath;
+ String fileName = partitionName.equals(NON_PARTITIONED_NAME)
+ ? filePartitionPath
+ : filePartitionPath.substring(partitionName.length() + 1);
+
+ if (isDeleted) {
+ // TODO we should delete records instead of stubbing them
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList = columnsToIndex.stream()
+ .map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
+ .collect(Collectors.toList());
+
+ return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, true);
}
- return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
+
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
+ readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient, columnsToIndex);
+
+ return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadata, false);
}
- private static Stream<HoodieRecord> getColumnStats(final String partitionPath, final String filePathWithPartition,
- HoodieTableMetaClient datasetMetaClient,
- List<String> columnsToIndex,
- boolean isDeleted) {
- final String partition = getPartition(partitionPath);
- final int offset = partition.equals(NON_PARTITIONED_NAME) ? (filePathWithPartition.startsWith("/") ? 1 : 0)
- : partition.length() + 1;
- final String fileName = filePathWithPartition.substring(offset);
-
- if (filePathWithPartition.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- final Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePathWithPartition);
- List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList;
- if (!isDeleted) {
- columnRangeMetadataList = new ParquetUtils().readRangeFromParquetMetadata(
- datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
- } else {
- // TODO we should delete records instead of stubbing them
- columnRangeMetadataList =
- columnsToIndex.stream().map(entry -> HoodieColumnRangeMetadata.stub(fileName, entry))
- .collect(Collectors.toList());
+ private static List<HoodieColumnRangeMetadata<Comparable>> readColumnRangeMetadataFrom(String filePath,
+ HoodieTableMetaClient datasetMetaClient,
+ List<String> columnsToIndex) {
+ try {
+ if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ Path fullFilePath = new Path(datasetMetaClient.getBasePath(), filePath);
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
+ new ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), fullFilePath, columnsToIndex);
+
+ return columnRangeMetadataList;
}
- return HoodieMetadataPayload.createColumnStatsRecords(partitionPath, columnRangeMetadataList, isDeleted);
- } else {
- throw new HoodieException("Column range index not supported for filePathWithPartition " + fileName);
+
+ LOG.warn("Column range index not supported for: " + filePath);
+ return Collections.emptyList();
+ } catch (Exception e) {
+ // NOTE: In case reading column range metadata from individual file failed,
+ // we simply fall back, in lieu of failing the whole task
+ LOG.error("Failed to fetch column range metadata for: " + filePath);
+ return Collections.emptyList();
}
}
@@ -1105,72 +1204,37 @@ public class HoodieTableMetadataUtil {
}
/**
- * Accumulates column range metadata for the given field and updates the column range map.
- *
- * @param field - column for which statistics will be computed
- * @param filePath - data file path
- * @param columnRangeMap - old column range statistics, which will be merged in this computation
- * @param columnToStats - map of column to map of each stat and its value
- */
- public static void accumulateColumnRanges(Schema.Field field, String filePath,
- Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
- Map<String, Map<String, Object>> columnToStats) {
- Map<String, Object> columnStats = columnToStats.get(field.name());
- HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = HoodieColumnRangeMetadata.create(
- filePath,
- field.name(),
- (Comparable) String.valueOf(columnStats.get(MIN)),
- (Comparable) String.valueOf(columnStats.get(MAX)),
- Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()),
- Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()),
- Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()),
- Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString())
- );
- columnRangeMap.merge(field.name(), columnRangeMetadata, COLUMN_RANGE_MERGE_FUNCTION);
- }
-
- /**
- * Aggregates column stats for each field.
- *
- * @param record - current record
- * @param fields - fields for which stats will be aggregated
- * @param columnToStats - map of column to map of each stat and its value which gets updates in this method
- * @param consistentLogicalTimestampEnabled - flag to deal with logical timestamp type when getting column value
+ * Does an upcast for {@link BigDecimal} instance to align it with scale/precision expected by
+ * the {@link org.apache.avro.LogicalTypes.Decimal} Avro logical type
*/
- public static void aggregateColumnStats(IndexedRecord record, List<Schema.Field> fields,
- Map<String, Map<String, Object>> columnToStats,
- boolean consistentLogicalTimestampEnabled) {
- if (!(record instanceof GenericRecord)) {
- throw new HoodieIOException("Record is not a generic type to get column range metadata!");
+ public static BigDecimal tryUpcastDecimal(BigDecimal value, final LogicalTypes.Decimal decimal) {
+ final int scale = decimal.getScale();
+ final int valueScale = value.scale();
+
+ boolean scaleAdjusted = false;
+ if (valueScale != scale) {
+ try {
+ value = value.setScale(scale, RoundingMode.UNNECESSARY);
+ scaleAdjusted = true;
+ } catch (ArithmeticException aex) {
+ throw new AvroTypeException(
+ "Cannot encode decimal with scale " + valueScale + " as scale " + scale + " without rounding");
+ }
}
- fields.forEach(field -> {
- Map<String, Object> columnStats = columnToStats.getOrDefault(field.name(), new HashMap<>());
- final String fieldVal = getNestedFieldValAsString((GenericRecord) record, field.name(), true, consistentLogicalTimestampEnabled);
- // update stats
- final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
- columnStats.put(TOTAL_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_SIZE, 0).toString()) + fieldSize);
- columnStats.put(TOTAL_UNCOMPRESSED_SIZE, Long.parseLong(columnStats.getOrDefault(TOTAL_UNCOMPRESSED_SIZE, 0).toString()) + fieldSize);
-
- if (!isNullOrEmpty(fieldVal)) {
- // set the min value of the field
- if (!columnStats.containsKey(MIN)) {
- columnStats.put(MIN, fieldVal);
- }
- if (fieldVal.compareTo(String.valueOf(columnStats.get(MIN))) < 0) {
- columnStats.put(MIN, fieldVal);
- }
- // set the max value of the field
- if (fieldVal.compareTo(String.valueOf(columnStats.getOrDefault(MAX, ""))) > 0) {
- columnStats.put(MAX, fieldVal);
- }
- // increment non-null value count
- columnStats.put(VALUE_COUNT, Long.parseLong(columnStats.getOrDefault(VALUE_COUNT, 0).toString()) + 1);
+ int precision = decimal.getPrecision();
+ int valuePrecision = value.precision();
+ if (valuePrecision > precision) {
+ if (scaleAdjusted) {
+ throw new AvroTypeException("Cannot encode decimal with precision " + valuePrecision + " as max precision "
+ + precision + ". This is after safely adjusting scale from " + valueScale + " to required " + scale);
} else {
- // increment null value count
- columnStats.put(NULL_COUNT, Long.parseLong(columnStats.getOrDefault(NULL_COUNT, 0).toString()) + 1);
+ throw new AvroTypeException(
+ "Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision);
}
- });
+ }
+
+ return value;
}
private static Option<Schema> tryResolveSchemaForTable(HoodieTableMetaClient dataTableMetaClient) {
@@ -1178,14 +1242,82 @@ public class HoodieTableMetadataUtil {
return Option.empty();
}
- TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
try {
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(dataTableMetaClient);
return Option.of(schemaResolver.getTableAvroSchema());
} catch (Exception e) {
throw new HoodieException("Failed to get latest columns for " + dataTableMetaClient.getBasePath(), e);
}
}
+ /**
+ * Given a schema, coerces provided value to instance of {@link Comparable<?>} such that
+ * it could subsequently used in column stats
+ *
+ * NOTE: This method has to stay compatible with the semantic of
+ * {@link ParquetUtils#readRangeFromParquetMetadata} as they are used in tandem
+ */
+ private static Comparable<?> coerceToComparable(Schema schema, Object val) {
+ if (val == null) {
+ return null;
+ }
+
+ switch (schema.getType()) {
+ case UNION:
+ // TODO we need to handle unions in general case as well
+ return coerceToComparable(resolveNullableSchema(schema), val);
+
+ case FIXED:
+ case BYTES:
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ return (Comparable<?>) val;
+ }
+ return (ByteBuffer) val;
+
+
+ case INT:
+ if (schema.getLogicalType() == LogicalTypes.date()
+ || schema.getLogicalType() == LogicalTypes.timeMillis()) {
+ // NOTE: This type will be either {@code java.sql.Date} or {org.joda.LocalDate}
+ // depending on the Avro version. Hence, we simply cast it to {@code Comparable<?>}
+ return (Comparable<?>) val;
+ }
+ return (Integer) val;
+
+ case LONG:
+ if (schema.getLogicalType() == LogicalTypes.timeMicros()
+ || schema.getLogicalType() == LogicalTypes.timestampMicros()
+ || schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+ // NOTE: This type will be either {@code java.sql.Date} or {org.joda.LocalDate}
+ // depending on the Avro version. Hence, we simply cast it to {@code Comparable<?>}
+ return (Comparable<?>) val;
+ }
+ return (Long) val;
+
+ case STRING:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ return (Comparable<?>) val;
+
+
+ // TODO add support for those types
+ case ENUM:
+ case MAP:
+ case NULL:
+ case RECORD:
+ case ARRAY:
+ return null;
+
+ default:
+ throw new IllegalStateException("Unexpected type: " + schema.getType());
+ }
+ }
+
+ private static boolean canCompare(Schema schema) {
+ return schema.getType() != Schema.Type.MAP;
+ }
+
public static Set<String> getInflightMetadataPartitions(HoodieTableConfig tableConfig) {
return StringUtils.toSet(tableConfig.getMetadataPartitionsInflight());
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index 9fb268e..85505c0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -31,6 +31,8 @@ public enum MetadataPartitionType {
// FileId prefix used for all file groups in this partition.
private final String fileIdPrefix;
// Total file groups
+ // TODO fix: enum should not have any mutable aspect as this compromises whole idea
+ // of the inum being static, immutable entity
private int fileGroupCount = 1;
MetadataPartitionType(final String partitionPath, final String fileIdPrefix) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index e64964e..8c57dc8 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.avro;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.exception.SchemaCompatibilityException;
import org.apache.avro.JsonProperties;
@@ -27,12 +28,14 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -88,6 +91,12 @@ public class TestHoodieAvroUtils {
+ "{\"name\":\"decimal_col\",\"type\":[\"null\","
+ "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}";
+ private static String SCHEMA_WITH_NESTED_FIELD = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+ + "{\"name\":\"firstname\",\"type\":\"string\"},"
+ + "{\"name\":\"lastname\",\"type\":\"string\"},"
+ + "{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+ + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";
+
@Test
public void testPropsPresent() {
Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
@@ -248,7 +257,7 @@ public class TestHoodieAvroUtils {
}
@Test
- public void testGetNestedFieldValWithDecimalFiled() {
+ public void testGetNestedFieldValWithDecimalField() {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD));
rec.put("key_col", "key");
BigDecimal bigDecimal = new BigDecimal("1234.5678");
@@ -264,4 +273,36 @@ public class TestHoodieAvroUtils {
assertEquals(0, buffer.position());
}
+ @Test
+ public void testGetNestedFieldSchema() throws IOException {
+ Schema schema = SchemaTestUtil.getEvolvedSchema();
+ GenericRecord rec = new GenericData.Record(schema);
+ rec.put("field1", "key1");
+ rec.put("field2", "val1");
+ rec.put("name", "val2");
+ rec.put("favorite_number", 2);
+ // test simple field schema
+ assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(rec.getSchema(), "field1"));
+
+ GenericRecord rec2 = new GenericData.Record(schema);
+ rec2.put("field1", "key1");
+ rec2.put("field2", "val1");
+ rec2.put("name", "val2");
+ rec2.put("favorite_number", 12);
+ // test comparison of non-string type
+ assertEquals(-1, GenericData.get().compare(rec.get("favorite_number"), rec2.get("favorite_number"), getNestedFieldSchemaFromWriteSchema(rec.getSchema(), "favorite_number")));
+
+ // test nested field schema
+ Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);
+ GenericRecord rec3 = new GenericData.Record(nestedSchema);
+ rec3.put("firstname", "person1");
+ rec3.put("lastname", "person2");
+ GenericRecord studentRecord = new GenericData.Record(rec3.getSchema().getField("student").schema());
+ studentRecord.put("firstname", "person1");
+ studentRecord.put("lastname", "person2");
+ rec3.put("student", studentRecord);
+
+ assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(rec3.getSchema(), "student.firstname"));
+ assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(nestedSchema, "student.firstname"));
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 5b3dabd..71917f9 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -1749,40 +1749,39 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
- HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
- fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(),
- bufferSize, readBlocksLazily, true);
+ HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
+ try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
- assertTrue(reader.hasPrev(), "Last block should be available");
- HoodieLogBlock prevBlock = reader.prev();
- HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
+ assertTrue(reader.hasPrev(), "Last block should be available");
+ HoodieLogBlock prevBlock = reader.prev();
+ HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
- List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead);
- assertEquals(copyOfRecords3.size(), recordsRead1.size(),
- "Third records size should be equal to the written records size");
- assertEquals(copyOfRecords3, recordsRead1,
- "Both records lists should be the same. (ordering guaranteed)");
+ List<IndexedRecord> recordsRead1 = getRecords(dataBlockRead);
+ assertEquals(copyOfRecords3.size(), recordsRead1.size(),
+ "Third records size should be equal to the written records size");
+ assertEquals(copyOfRecords3, recordsRead1,
+ "Both records lists should be the same. (ordering guaranteed)");
- assertTrue(reader.hasPrev(), "Second block should be available");
- prevBlock = reader.prev();
- dataBlockRead = (HoodieDataBlock) prevBlock;
- List<IndexedRecord> recordsRead2 = getRecords(dataBlockRead);
- assertEquals(copyOfRecords2.size(), recordsRead2.size(),
- "Read records size should be equal to the written records size");
- assertEquals(copyOfRecords2, recordsRead2,
- "Both records lists should be the same. (ordering guaranteed)");
+ assertTrue(reader.hasPrev(), "Second block should be available");
+ prevBlock = reader.prev();
+ dataBlockRead = (HoodieDataBlock) prevBlock;
+ List<IndexedRecord> recordsRead2 = getRecords(dataBlockRead);
+ assertEquals(copyOfRecords2.size(), recordsRead2.size(),
+ "Read records size should be equal to the written records size");
+ assertEquals(copyOfRecords2, recordsRead2,
+ "Both records lists should be the same. (ordering guaranteed)");
- assertTrue(reader.hasPrev(), "First block should be available");
- prevBlock = reader.prev();
- dataBlockRead = (HoodieDataBlock) prevBlock;
- List<IndexedRecord> recordsRead3 = getRecords(dataBlockRead);
- assertEquals(copyOfRecords1.size(), recordsRead3.size(),
- "Read records size should be equal to the written records size");
- assertEquals(copyOfRecords1, recordsRead3,
- "Both records lists should be the same. (ordering guaranteed)");
+ assertTrue(reader.hasPrev(), "First block should be available");
+ prevBlock = reader.prev();
+ dataBlockRead = (HoodieDataBlock) prevBlock;
+ List<IndexedRecord> recordsRead3 = getRecords(dataBlockRead);
+ assertEquals(copyOfRecords1.size(), recordsRead3.size(),
+ "Read records size should be equal to the written records size");
+ assertEquals(copyOfRecords1, recordsRead3,
+ "Both records lists should be the same. (ordering guaranteed)");
- assertFalse(reader.hasPrev());
- reader.close();
+ assertFalse(reader.hasPrev());
+ }
}
@ParameterizedTest
@@ -1830,19 +1829,20 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
writer.close();
// First round of reads - we should be able to read the first block and then EOF
- HoodieLogFileReader reader =
- new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
- fs.getFileStatus(writer.getLogFile().getPath()).getLen()), schema, bufferSize, readBlocksLazily, true);
+ HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
- assertTrue(reader.hasPrev(), "Last block should be available");
- HoodieLogBlock block = reader.prev();
- assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock");
+ try (HoodieLogFileReader reader =
+ new HoodieLogFileReader(fs, logFile, schema, bufferSize, readBlocksLazily, true)) {
- assertTrue(reader.hasPrev(), "Last block should be available");
- assertThrows(CorruptedLogFileException.class, () -> {
- reader.prev();
- });
- reader.close();
+ assertTrue(reader.hasPrev(), "Last block should be available");
+ HoodieLogBlock block = reader.prev();
+ assertTrue(block instanceof HoodieDataBlock, "Last block should be datablock");
+
+ assertTrue(reader.hasPrev(), "Last block should be available");
+ assertThrows(CorruptedLogFileException.class, () -> {
+ reader.prev();
+ });
+ }
}
@ParameterizedTest
@@ -1882,28 +1882,28 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
- HoodieLogFileReader reader = new HoodieLogFileReader(fs, new HoodieLogFile(writer.getLogFile().getPath(),
- fs.getFileStatus(writer.getLogFile().getPath()).getLen()), SchemaTestUtil.getSimpleSchema(),
- bufferSize, readBlocksLazily, true);
+ HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen());
+ try (HoodieLogFileReader reader =
+ new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), bufferSize, readBlocksLazily, true)) {
- assertTrue(reader.hasPrev(), "Third block should be available");
- reader.moveToPrev();
+ assertTrue(reader.hasPrev(), "Third block should be available");
+ reader.moveToPrev();
- assertTrue(reader.hasPrev(), "Second block should be available");
- reader.moveToPrev();
+ assertTrue(reader.hasPrev(), "Second block should be available");
+ reader.moveToPrev();
- // After moving twice, this last reader.prev() should read the First block written
- assertTrue(reader.hasPrev(), "First block should be available");
- HoodieLogBlock prevBlock = reader.prev();
- HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
- List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
- assertEquals(copyOfRecords1.size(), recordsRead.size(),
- "Read records size should be equal to the written records size");
- assertEquals(copyOfRecords1, recordsRead,
- "Both records lists should be the same. (ordering guaranteed)");
+ // After moving twice, this last reader.prev() should read the First block written
+ assertTrue(reader.hasPrev(), "First block should be available");
+ HoodieLogBlock prevBlock = reader.prev();
+ HoodieDataBlock dataBlockRead = (HoodieDataBlock) prevBlock;
+ List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
+ assertEquals(copyOfRecords1.size(), recordsRead.size(),
+ "Read records size should be equal to the written records size");
+ assertEquals(copyOfRecords1, recordsRead,
+ "Both records lists should be the same. (ordering guaranteed)");
- assertFalse(reader.hasPrev());
- reader.close();
+ assertFalse(reader.hasPrev());
+ }
}
@Test
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index a403f92..c80c5e2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -99,15 +99,6 @@ public class FileCreateUtils {
return String.format("%s_%s_%s%s%s.%s", fileId, WRITE_TOKEN, instantTime, fileExtension, HoodieTableMetaClient.MARKER_EXTN, ioType);
}
- private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException {
- Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
- Files.createDirectories(parentPath);
- Path metaFilePath = parentPath.resolve(instantTime + suffix);
- if (Files.notExists(metaFilePath)) {
- Files.createFile(metaFilePath);
- }
- }
-
private static void createMetaFile(String basePath, String instantTime, String suffix, FileSystem fs) throws IOException {
org.apache.hadoop.fs.Path parentPath = new org.apache.hadoop.fs.Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
if (!fs.exists(parentPath)) {
@@ -119,12 +110,20 @@ public class FileCreateUtils {
}
}
+ private static void createMetaFile(String basePath, String instantTime, String suffix) throws IOException {
+ createMetaFile(basePath, instantTime, suffix, "".getBytes());
+ }
+
private static void createMetaFile(String basePath, String instantTime, String suffix, byte[] content) throws IOException {
Path parentPath = Paths.get(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
Files.createDirectories(parentPath);
Path metaFilePath = parentPath.resolve(instantTime + suffix);
if (Files.notExists(metaFilePath)) {
- Files.write(metaFilePath, content);
+ if (content.length == 0) {
+ Files.createFile(metaFilePath);
+ } else {
+ Files.write(metaFilePath, content);
+ }
}
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index fa6998b..881197e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -117,8 +117,12 @@ import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
public class HoodieTestTable {
+ public static final String PHONY_TABLE_SCHEMA =
+ "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", \"name\": \"PhonyRecord\", \"fields\": []}";
+
private static final Logger LOG = LogManager.getLogger(HoodieTestTable.class);
private static final Random RANDOM = new Random();
+
protected static HoodieTestTableState testTableState;
private final List<String> inflightCommits = new ArrayList<>();
@@ -215,7 +219,7 @@ public class HoodieTestTable {
writeStats.addAll(generateHoodieWriteStatForPartitionLogFiles(testTableState.getPartitionToLogFileInfoMap(commitTime), commitTime, bootstrap));
}
Map<String, String> extraMetadata = createImmutableMap("test", "test");
- return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, EMPTY_STRING, action);
+ return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, PHONY_TABLE_SCHEMA, action);
}
public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
@@ -779,7 +783,7 @@ public class HoodieTestTable {
this.withBaseFilesInPartition(partition, testTableState.getPartitionToBaseFileInfoMap(commitTime).get(partition));
}
HoodieReplaceCommitMetadata replaceMetadata =
- (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, EMPTY_STRING,
+ (HoodieReplaceCommitMetadata) buildMetadata(writeStats, partitionToReplaceFileIds, Option.empty(), CLUSTER, PHONY_TABLE_SCHEMA,
REPLACE_COMMIT_ACTION);
addReplaceCommit(commitTime, Option.empty(), Option.empty(), replaceMetadata);
return replaceMetadata;
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
new file mode 100644
index 0000000..ea44170
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, tryUnpackNonNullVal}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, MetadataPartitionType}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+
+import scala.collection.immutable.TreeSet
+
+/**
+ * Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
+ * providing convenient interfaces to read it, transpose, etc
+ */
+trait ColumnStatsIndexSupport {
+
+ def readColumnStatsIndex(spark: SparkSession, metadataTablePath: String): DataFrame = {
+ val targetColStatsIndexColumns = Seq(
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
+
+ val requiredMetadataIndexColumns =
+ (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
+ s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
+
+ // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
+ val metadataTableDF = spark.read.format("org.apache.hudi")
+ .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
+
+ // TODO filter on (column, partition) prefix
+ val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+ .select(requiredMetadataIndexColumns.map(col): _*)
+
+ colStatsDF
+ }
+
+ /**
+ * Transposes and converts the raw table format of the Column Stats Index representation,
+ * where each row/record corresponds to individual (column, file) pair, into the table format
+ * where each row corresponds to single file with statistic for individual columns collated
+ * w/in such row:
+ *
+ * Metadata Table Column Stats Index format:
+ *
+ * <pre>
+ * +---------------------------+------------+------------+------------+-------------+
+ * | fileName | columnName | minValue | maxValue | num_nulls |
+ * +---------------------------+------------+------------+------------+-------------+
+ * | one_base_file.parquet | A | 1 | 10 | 0 |
+ * | another_base_file.parquet | A | -10 | 0 | 5 |
+ * +---------------------------+------------+------------+------------+-------------+
+ * </pre>
+ *
+ * Returned table format
+ *
+ * <pre>
+ * +---------------------------+------------+------------+-------------+
+ * | file | A_minValue | A_maxValue | A_num_nulls |
+ * +---------------------------+------------+------------+-------------+
+ * | one_base_file.parquet | 1 | 10 | 0 |
+ * | another_base_file.parquet | -10 | 0 | 5 |
+ * +---------------------------+------------+------------+-------------+
+ * </pre>
+ *
+ * NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
+ * query at hand might only be referencing a handful of those. As such, we collect all the
+ * column references from the filtering expressions, and only transpose records corresponding to the
+ * columns referenced in those
+ *
+ * @param spark Spark session ref
+ * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+ * @param targetColumns target columns to be included into the final table
+ * @param tableSchema schema of the source data table
+ * @return reshaped table according to the format outlined above
+ */
+ def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
+ val colStatsSchema = colStatsDF.schema
+ val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
+ case (field, ordinal) => (field.name, ordinal)
+ }).toMap
+
+ val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
+
+ // NOTE: We're sorting the columns to make sure final index schema matches layout
+ // of the transposed table
+ val sortedColumns = TreeSet(targetColumns: _*)
+
+ val transposedRDD = colStatsDF.rdd
+ .filter(row => sortedColumns.contains(row.getString(colStatsSchemaOrdinalsMap("columnName"))))
+ .map { row =>
+ val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("minValue")))
+ val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](colStatsSchemaOrdinalsMap("maxValue")))
+
+ val colName = row.getString(colStatsSchemaOrdinalsMap("columnName"))
+ val colType = tableSchemaFieldMap(colName).dataType
+
+ val rowValsSeq = row.toSeq.toArray
+
+ rowValsSeq(colStatsSchemaOrdinalsMap("minValue")) = deserialize(minValue, colType)
+ rowValsSeq(colStatsSchemaOrdinalsMap("maxValue")) = deserialize(maxValue, colType)
+
+ Row(rowValsSeq:_*)
+ }
+ .groupBy(r => r.getString(colStatsSchemaOrdinalsMap("fileName")))
+ .foldByKey(Seq[Row]()) {
+ case (_, columnRows) =>
+ // Rows seq is always non-empty (otherwise it won't be grouped into)
+ val fileName = columnRows.head.get(colStatsSchemaOrdinalsMap("fileName"))
+ val coalescedRowValuesSeq = columnRows.toSeq
+ // NOTE: It's crucial to maintain appropriate ordering of the columns
+ // matching table layout
+ .sortBy(_.getString(colStatsSchemaOrdinalsMap("columnName")))
+ .foldLeft(Seq[Any](fileName)) {
+ case (acc, columnRow) =>
+ acc ++ Seq("minValue", "maxValue", "nullCount").map(ord => columnRow.get(colStatsSchemaOrdinalsMap(ord)))
+ }
+
+ Seq(Row(coalescedRowValuesSeq:_*))
+ }
+ .values
+ .flatMap(it => it)
+
+ // NOTE: It's crucial to maintain appropriate ordering of the columns
+ // matching table layout: hence, we cherry-pick individual columns
+ // instead of simply filtering in the ones we're interested in the schema
+ val indexSchema = composeIndexSchema(sortedColumns.toSeq, tableSchema)
+
+ spark.createDataFrame(transposedRDD, indexSchema)
+ }
+}
+
+object ColumnStatsIndexSupport {
+
+ private val COLUMN_STATS_INDEX_FILE_COLUMN_NAME = "fileName"
+ private val COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME = "minValue"
+ private val COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue"
+ private val COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls"
+
+ /**
+ * @VisibleForTesting
+ */
+ def composeIndexSchema(targetColumnNames: Seq[String], tableSchema: StructType): StructType = {
+ val fileNameField = StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType, nullable = true, Metadata.empty)
+ val targetFields = targetColumnNames.map(colName => tableSchema.fields.find(f => f.name == colName).get)
+
+ StructType(
+ targetFields.foldLeft(Seq(fileNameField)) {
+ case (acc, field) =>
+ acc ++ Seq(
+ composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, field.dataType),
+ composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, field.dataType),
+ composeColumnStatStructType(field.name, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType))
+ }
+ )
+ }
+
+ @inline def getMinColumnNameFor(colName: String): String =
+ formatColName(colName, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME)
+
+ @inline def getMaxColumnNameFor(colName: String): String =
+ formatColName(colName, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME)
+
+ @inline def getNumNullsColumnNameFor(colName: String): String =
+ formatColName(colName, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME)
+
+ @inline private def formatColName(col: String, statName: String) = { // TODO add escaping for
+ String.format("%s_%s", col, statName)
+ }
+
+ @inline private def composeColumnStatStructType(col: String, statName: String, dataType: DataType) =
+ StructField(formatColName(col, statName), dataType, nullable = true, Metadata.empty)
+
+ private def tryUnpackNonNullVal(statStruct: Row): (Any, Int) =
+ statStruct.toSeq.zipWithIndex
+ .find(_._1 != null)
+ // NOTE: First non-null value will be a wrapper (converted into Row), bearing a single
+ // value
+ .map { case (value, ord) => (value.asInstanceOf[Row].get(0), ord)}
+ .getOrElse((null, -1))
+
+ private def deserialize(value: Any, dataType: DataType): Any = {
+ dataType match {
+ // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" logical-types, we're
+ // manually encoding corresponding values as int and long w/in the Column Stats Index and
+ // here we have to decode those back into corresponding logical representation.
+ case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])
+ case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int])
+
+ // NOTE: All integral types of size less than Int are encoded as Ints in MT
+ case ShortType => value.asInstanceOf[Int].toShort
+ case ByteType => value.asInstanceOf[Int].toByte
+
+ case _ => value
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 7fa3e93..82cd1f4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -24,19 +24,17 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, MetadataPartitionType}
+import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory}
-import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlCommonUtils}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
@@ -81,7 +79,8 @@ case class HoodieFileIndex(spark: SparkSession,
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
- with FileIndex {
+ with FileIndex
+ with ColumnStatsIndexSupport {
override def rootPaths: Seq[Path] = queryPaths.asScala
@@ -202,61 +201,12 @@ case class HoodieFileIndex(spark: SparkSession,
if (!isDataSkippingEnabled || !fs.exists(new Path(metadataTablePath)) || queryFilters.isEmpty) {
Option.empty
} else {
- val targetColStatsIndexColumns = Seq(
- HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
-
- val requiredMetadataIndexColumns =
- (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
- s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
-
- // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
- val metadataTableDF = spark.read.format("org.apache.hudi")
- .load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
-
- // TODO filter on (column, partition) prefix
- val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
- .select(requiredMetadataIndexColumns.map(col): _*)
-
+ val colStatsDF: DataFrame = readColumnStatsIndex(spark, metadataTablePath)
val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(colStatsDF) {
- // Metadata Table bears rows in the following format
- //
- // +---------------------------+------------+------------+------------+-------------+
- // | fileName | columnName | minValue | maxValue | num_nulls |
- // +---------------------------+------------+------------+------------+-------------+
- // | one_base_file.parquet | A | 1 | 10 | 0 |
- // | another_base_file.parquet | A | -10 | 0 | 5 |
- // +---------------------------+------------+------------+------------+-------------+
- //
- // While Data Skipping utils are expecting following (transposed) format, where per-column stats are
- // essentially transposed (from rows to columns):
- //
- // +---------------------------+------------+------------+-------------+
- // | file | A_minValue | A_maxValue | A_num_nulls |
- // +---------------------------+------------+------------+-------------+
- // | one_base_file.parquet | 1 | 10 | 0 |
- // | another_base_file.parquet | -10 | 0 | 5 |
- // +---------------------------+------------+------------+-------------+
- //
- // NOTE: Column Stats Index might potentially contain statistics for many columns (if not all), while
- // query at hand might only be referencing a handful of those. As such, we collect all the
- // column references from the filtering expressions, and only transpose records corresponding to the
- // columns referenced in those
- val transposedColStatsDF =
- queryReferencedColumns.map(colName =>
- colStatsDF.filter(col(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).equalTo(colName))
- .select(targetColStatsIndexColumns.map(col): _*)
- .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, getNumNullsColumnNameFor(colName))
- .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, getMinColumnNameFor(colName))
- .withColumnRenamed(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, getMaxColumnNameFor(colName))
- )
- .reduceLeft((left, right) =>
- left.join(right, usingColumn = HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME))
+ val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)
// Persist DF to avoid re-computing column statistics unraveling
withPersistence(transposedColStatsDF) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieSparkTypeUtils.scala
similarity index 79%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieSparkTypeUtils.scala
index d5d9587..3b0fcf0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkTypeUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieSparkTypeUtils.scala
@@ -15,14 +15,21 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.sql
-import org.apache.spark.sql.types.{DataType, NumericType, StringType}
+import org.apache.spark.sql.types.{DataType, DecimalType, NumericType, StringType}
// TODO unify w/ DataTypeUtils
object HoodieSparkTypeUtils {
/**
+ * Returns whether this DecimalType is wider than `other`. If yes, it means `other`
+ * can be casted into `this` safely without losing any precision or range.
+ */
+ def isWiderThan(one: DecimalType, another: DecimalType) =
+ one.isWiderThan(another)
+
+ /**
* Checks whether casting expression of [[from]] [[DataType]] to [[to]] [[DataType]] will
* preserve ordering of the elements
*/
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
similarity index 62%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
index a21a296..8995701 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,11 +16,12 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.sql
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
/**
@@ -27,6 +29,10 @@ import org.apache.spark.util.MutablePair
*/
object HoodieUnsafeRDDUtils {
+ // TODO scala-doc
+ def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType: StructType): DataFrame =
+ spark.internalCreateDataFrame(rdd, structType)
+
/**
* Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly
* copied [[Array]] of [[InternalRow]]s
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index b7ddd28..bdaddd3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -17,9 +17,9 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.ColumnStatsIndexSupport.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
import org.apache.hudi.SparkAdapterSupport
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper.{getMaxColumnNameFor, getMinColumnNameFor, getNumNullsColumnNameFor}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
similarity index 80%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java
rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
index d34480c..a60fac2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/ColumnStatsIndexHelper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.hudi.index.columnstats;
+package org.apache.hudi;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -41,17 +41,15 @@ import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.FloatType;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
-import org.apache.spark.sql.types.LongType$;
-import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
-import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
import scala.collection.JavaConversions;
+import scala.collection.JavaConverters$;
import javax.annotation.Nonnull;
import java.math.BigDecimal;
@@ -63,49 +61,9 @@ import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+// TODO merge w/ ColumnStatsIndexSupport
public class ColumnStatsIndexHelper {
- private static final String COLUMN_STATS_INDEX_FILE_COLUMN_NAME = "file";
- private static final String COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME = "minValue";
- private static final String COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME = "maxValue";
- private static final String COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME = "num_nulls";
-
- public static String getMinColumnNameFor(String colName) {
- return composeZIndexColName(colName, COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME);
- }
-
- public static String getMaxColumnNameFor(String colName) {
- return composeZIndexColName(colName, COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME);
- }
-
- public static String getNumNullsColumnNameFor(String colName) {
- return composeZIndexColName(colName, COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME);
- }
-
- /**
- * @VisibleForTesting
- */
- @Nonnull
- public static StructType composeIndexSchema(@Nonnull List<StructField> zorderedColumnsSchemas) {
- List<StructField> schema = new ArrayList<>();
- schema.add(new StructField(COLUMN_STATS_INDEX_FILE_COLUMN_NAME, StringType$.MODULE$, true, Metadata.empty()));
- zorderedColumnsSchemas.forEach(colSchema -> {
- schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MIN_VALUE_STAT_NAME, colSchema.dataType()));
- schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_MAX_VALUE_STAT_NAME, colSchema.dataType()));
- schema.add(composeColumnStatStructType(colSchema.name(), COLUMN_STATS_INDEX_NUM_NULLS_STAT_NAME, LongType$.MODULE$));
- });
- return StructType$.MODULE$.apply(schema);
- }
-
- private static StructField composeColumnStatStructType(String col, String statName, DataType dataType) {
- return new StructField(composeZIndexColName(col, statName), dataType, true, Metadata.empty());
- }
-
- private static String composeZIndexColName(String col, String statName) {
- // TODO add escaping for
- return String.format("%s_%s", col, statName);
- }
-
public static Pair<Object, Object>
fetchMinMaxValues(
@Nonnull DataType colType,
@@ -199,10 +157,16 @@ public class ColumnStatsIndexHelper {
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
+ List<String> columnNames = orderedColumnSchemas.stream()
+ .map(StructField::name)
+ .collect(Collectors.toList());
+
SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration());
int numParallelism = (baseFilesPaths.size() / 3 + 1);
- List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
+
String previousJobDescription = sc.getLocalProperty("spark.job.description");
+
+ List<HoodieColumnRangeMetadata<Comparable>> colMinMaxInfos;
try {
jsc.setJobDescription("Listing parquet column statistics");
colMinMaxInfos =
@@ -215,9 +179,7 @@ public class ColumnStatsIndexHelper {
utils.readRangeFromParquetMetadata(
serializableConfiguration.value(),
new Path(path),
- orderedColumnSchemas.stream()
- .map(StructField::name)
- .collect(Collectors.toList())
+ columnNames
)
.stream()
)
@@ -274,7 +236,10 @@ public class ColumnStatsIndexHelper {
})
.filter(Objects::nonNull);
- StructType indexSchema = composeIndexSchema(orderedColumnSchemas);
+ StructType indexSchema = ColumnStatsIndexSupport$.MODULE$.composeIndexSchema(
+ JavaConverters$.MODULE$.collectionAsScalaIterableConverter(columnNames).asScala().toSeq(),
+ StructType$.MODULE$.apply(orderedColumnSchemas)
+ );
return sparkSession.createDataFrame(allMetaDataRDD, indexSchema);
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json
new file mode 100644
index 0000000..1ed929c
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json
@@ -0,0 +1,4 @@
+{"c1_maxValue":769,"c1_minValue":309,"c1_num_nulls":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_num_nulls":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.543-08:00","c4_minValue":"2021-11-19T20:40:55.521-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":32,"c5_num_nulls":0,"c6_maxValue":"2020-11-14","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"uQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":932,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_num_nulls":0,"c5_maxValue":94,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue [...]
+{"c1_maxValue":943,"c1_minValue":89,"c1_num_nulls":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_num_nulls":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.508-08:00","c4_num_nulls":0,"c5_maxValue":95,"c5_minValue":10,"c5_num_nulls":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-10","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"LA==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":959,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_num_nulls":0,"c5_maxValue":97,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json
new file mode 100644
index 0000000..b5486d1
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json
@@ -0,0 +1,8 @@
+{"c1_maxValue":568,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":82.111,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.193-08:00","c4_minValue":"2021-11-18T23:34:44.159-08:00","c4_num_nulls":0,"c5_maxValue":58,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-08","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"9g==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue [...]
+{"c1_maxValue":715,"c1_minValue":76,"c1_num_nulls":0,"c2_maxValue":" 76sdc","c2_minValue":" 224sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":246.427,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.199-08:00","c4_minValue":"2021-11-18T23:34:44.166-08:00","c4_num_nulls":0,"c5_maxValue":73,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"LA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
+{"c1_maxValue":768,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 768sdc","c2_minValue":" 118sdc","c2_num_nulls":0,"c3_maxValue":959.131,"c3_minValue":64.768,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.164-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-05-04","c6_num_nulls":0,"c7_maxValue":"zw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
+{"c1_maxValue":769,"c1_minValue":309,"c1_num_nulls":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_num_nulls":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.543-08:00","c4_minValue":"2021-11-19T20:40:55.521-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":32,"c5_num_nulls":0,"c6_maxValue":"2020-11-14","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"uQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":770,"c1_minValue":129,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":153.431,"c3_num_nulls":0,"c4_maxValue":"2021-11-18T23:34:44.201-08:00","c4_minValue":"2021-11-18T23:34:44.169-08:00","c4_num_nulls":0,"c5_maxValue":78,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-15","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_mi [...]
+{"c1_maxValue":932,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.339-08:00","c4_num_nulls":0,"c5_maxValue":94,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"xw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue [...]
+{"c1_maxValue":943,"c1_minValue":89,"c1_num_nulls":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_num_nulls":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.549-08:00","c4_minValue":"2021-11-19T20:40:55.508-08:00","c4_num_nulls":0,"c5_maxValue":95,"c5_minValue":10,"c5_num_nulls":0,"c6_maxValue":"2020-10-10","c6_minValue":"2020-01-10","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"LA==","c7_num_nulls":0,"c8_maxValue":9,"c8_min [...]
+{"c1_maxValue":959,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c4_maxValue":"2021-11-19T20:40:55.550-08:00","c4_minValue":"2021-11-19T20:40:55.507-08:00","c4_num_nulls":0,"c5_maxValue":97,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minVa [...]
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json
deleted file mode 100644
index 00d16c6..0000000
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table-merged.json
+++ /dev/null
@@ -1,8 +0,0 @@
-{"c1_maxValue":272,"c1_minValue":8,"c1_num_nulls":0,"c2_maxValue":" 8sdc","c2_minValue":" 129sdc","c2_num_nulls":0,"c3_maxValue":979.272,"c3_minValue":430.129,"c3_num_nulls":0,"c5_maxValue":28,"c5_minValue":2,"c5_num_nulls":0,"c6_maxValue":"2020-11-20","c6_minValue":"2020-03-23","c6_num_nulls":0,"c7_maxValue":"8A==","c7_minValue":"Ag==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
-{"c1_maxValue":486,"c1_minValue":59,"c1_num_nulls":0,"c2_maxValue":" 79sdc","c2_minValue":" 111sdc","c2_num_nulls":0,"c3_maxValue":771.590,"c3_minValue":82.111,"c3_num_nulls":0,"c5_maxValue":50,"c5_minValue":7,"c5_num_nulls":0,"c6_maxValue":"2020-11-21","c6_minValue":"2020-01-22","c6_num_nulls":0,"c7_maxValue":"5g==","c7_minValue":"Ow==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
-{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
-{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
-{"c1_maxValue":719,"c1_minValue":125,"c1_num_nulls":0,"c2_maxValue":" 719sdc","c2_minValue":" 125sdc","c2_num_nulls":0,"c3_maxValue":958.579,"c3_minValue":153.125,"c3_num_nulls":0,"c5_maxValue":73,"c5_minValue":14,"c5_num_nulls":0,"c6_maxValue":"2020-09-27","c6_minValue":"2020-01-16","c6_num_nulls":0,"c7_maxValue":"+g==","c7_minValue":"OA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
-{"c1_maxValue":770,"c1_minValue":300,"c1_num_nulls":0,"c2_maxValue":" 770sdc","c2_minValue":" 300sdc","c2_num_nulls":0,"c3_maxValue":977.328,"c3_minValue":64.768,"c3_num_nulls":0,"c5_maxValue":78,"c5_minValue":31,"c5_num_nulls":0,"c6_maxValue":"2020-10-21","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"rw==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
-{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
-{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json
deleted file mode 100644
index a633e31..0000000
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/z-index-table.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{"c1_maxValue":559,"c1_minValue":74,"c1_num_nulls":0,"c2_maxValue":" 74sdc","c2_minValue":" 181sdc","c2_num_nulls":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_num_nulls":0,"c5_maxValue":57,"c5_minValue":9,"c5_num_nulls":0,"c6_maxValue":"2020-11-09","c6_minValue":"2020-01-08","c6_num_nulls":0,"c7_maxValue":"1Q==","c7_minValue":"Gw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00001-xxx-c000.snappy.parquet"}
-{"c1_maxValue":639,"c1_minValue":323,"c1_num_nulls":0,"c2_maxValue":" 639sdc","c2_minValue":" 323sdc","c2_num_nulls":0,"c3_maxValue":811.638,"c3_minValue":100.556,"c3_num_nulls":0,"c5_maxValue":65,"c5_minValue":33,"c5_num_nulls":0,"c6_maxValue":"2020-09-09","c6_minValue":"2020-01-23","c6_num_nulls":0,"c7_maxValue":"fw==","c7_minValue":"Kw==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00000-xxx-c000.snappy.parquet"}
-{"c1_maxValue":945,"c1_minValue":355,"c1_num_nulls":0,"c2_maxValue":" 945sdc","c2_minValue":" 355sdc","c2_num_nulls":0,"c3_maxValue":994.355,"c3_minValue":374.882,"c3_num_nulls":0,"c5_maxValue":96,"c5_minValue":37,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-02-25","c6_num_nulls":0,"c7_maxValue":"sQ==","c7_minValue":"AQ==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00002-xxx-c000.snappy.parquet"}
-{"c1_maxValue":959,"c1_minValue":0,"c1_num_nulls":0,"c2_maxValue":" 959sdc","c2_minValue":" 0sdc","c2_num_nulls":0,"c3_maxValue":916.697,"c3_minValue":19.000,"c3_num_nulls":0,"c5_maxValue":97,"c5_minValue":1,"c5_num_nulls":0,"c6_maxValue":"2020-11-22","c6_minValue":"2020-01-01","c6_num_nulls":0,"c7_maxValue":"yA==","c7_minValue":"AA==","c7_num_nulls":0,"c8_maxValue":9,"c8_minValue":9,"c8_num_nulls":0,"file":"part-00003-xxx-c000.snappy.parquet"}
\ No newline at end of file
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
index 07c1011..10b4faf 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala
@@ -17,7 +17,7 @@
package org.apache.hudi
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
+import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.expressions.{Expression, Not}
import org.apache.spark.sql.functions.{col, lower}
@@ -35,24 +35,22 @@ import java.sql.Timestamp
import scala.collection.JavaConverters._
// NOTE: Only A, B columns are indexed
-case class IndexRow(
- file: String,
+case class IndexRow(fileName: String,
- // Corresponding A column is LongType
- A_minValue: Long = -1,
- A_maxValue: Long = -1,
- A_num_nulls: Long = -1,
+ // Corresponding A column is LongType
+ A_minValue: Long = -1,
+ A_maxValue: Long = -1,
+ A_num_nulls: Long = -1,
- // Corresponding B column is StringType
- B_minValue: String = null,
- B_maxValue: String = null,
- B_num_nulls: Long = -1,
+ // Corresponding B column is StringType
+ B_minValue: String = null,
+ B_maxValue: String = null,
+ B_num_nulls: Long = -1,
- // Corresponding B column is TimestampType
- C_minValue: Timestamp = null,
- C_maxValue: Timestamp = null,
- C_num_nulls: Long = -1
-) {
+ // Corresponding B column is TimestampType
+ C_minValue: Timestamp = null,
+ C_maxValue: Timestamp = null,
+ C_num_nulls: Long = -1) {
def toRow: Row = Row(productIterator.toSeq: _*)
}
@@ -79,29 +77,27 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
)
)
- val indexSchema: StructType =
- ColumnStatsIndexHelper.composeIndexSchema(
- sourceTableSchema.fields.toSeq
- .filter(f => indexedCols.contains(f.name))
- .asJava
- )
+ val indexSchema: StructType = composeIndexSchema(indexedCols, sourceTableSchema)
@ParameterizedTest
@MethodSource(
Array(
- "testBasicLookupFilterExpressionsSource",
- "testAdvancedLookupFilterExpressionsSource",
- "testCompositeFilterExpressionsSource"
+ "testBasicLookupFilterExpressionsSource",
+ "testAdvancedLookupFilterExpressionsSource",
+ "testCompositeFilterExpressionsSource"
))
def testLookupFilterExpressions(sourceExpr: String, input: Seq[IndexRow], output: Seq[String]): Unit = {
+ // We have to fix the timezone to make sure all date-bound utilities output
+ // is consistent with the fixtures
spark.sqlContext.setConf(SESSION_LOCAL_TIMEZONE.key, "UTC")
+
val resolvedExpr: Expression = exprUtils.resolveExpr(spark, sourceExpr, sourceTableSchema)
val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema)
val indexDf = spark.createDataFrame(input.map(_.toRow).asJava, indexSchema)
val rows = indexDf.where(new Column(lookupFilter))
- .select("file")
+ .select("fileName")
.collect()
.map(_.getString(0))
.toSeq
@@ -121,7 +117,7 @@ class TestDataSkippingUtils extends HoodieClientTestBase with SparkAdapterSuppor
val indexDf = spark.createDataset(input)
val rows = indexDf.where(new Column(lookupFilter))
- .select("file")
+ .select("fileName")
.collect()
.map(_.getString(0))
.toSeq
@@ -340,7 +336,7 @@ object TestDataSkippingUtils {
arguments(
// Filter out all rows that contain A = 0 AND B = 'abc'
- "A != 0 OR B != 'abc'",
+ "A != 0 OR B != 'abc'",
Seq(
IndexRow("file_1", 1, 2, 0),
IndexRow("file_2", -1, 1, 0),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index 7c20be6..5d10a1d 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -20,25 +20,28 @@ package org.apache.hudi.functional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.ParquetUtils
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
+import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
+import org.apache.hudi.metadata.HoodieTableMetadata
import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
-import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Tag, Test}
+import org.junit.jupiter.api._
import java.math.BigInteger
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import scala.util.Random
-// TODO repurpose to test Column Stats in Metadata Table
-@Disabled
@Tag("functional")
-class TestColumnStatsIndex extends HoodieClientTestBase {
+class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSupport {
var spark: SparkSession = _
val sourceTableSchema =
@@ -67,140 +70,98 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
}
@Test
- def testZIndexTableComposition(): Unit = {
- val targetParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString
- val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
+ def testMetadataColumnStatsIndex(): Unit = {
+ val opts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ )
- bootstrapParquetInputTableFromJSON(sourceJSONTablePath, targetParquetTablePath)
+ setTableName("hoodie_test")
+ initMetaClient()
+
+ val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
- val inputDf =
// NOTE: Schema here is provided for validation that the input date is in the appropriate format
- spark.read
- .schema(sourceTableSchema)
- .parquet(targetParquetTablePath)
-
- val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
- val zorderedColsSchemaFields = inputDf.schema.fields.filter(f => zorderedCols.contains(f.name)).toSeq
-
- // {@link TimestampType} is not supported, and will throw -- hence skipping "c4"
- val newZIndexTableDf = null
-// ColumnStatsIndexHelper.buildColumnStatsTableFor(
-// inputDf.sparkSession,
-// inputDf.inputFiles.toSeq.asJava,
-// zorderedColsSchemaFields.asJava
-// )
-
- val indexSchema =
- ColumnStatsIndexHelper.composeIndexSchema(
- sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava
- )
-
- // Collect Z-index stats manually (reading individual Parquet files)
- val manualZIndexTableDf =
- buildColumnStatsTableManually(targetParquetTablePath, zorderedCols, indexSchema)
-
- // NOTE: Z-index is built against stats collected w/in Parquet footers, which will be
- // represented w/ corresponding Parquet schema (INT, INT64, INT96, etc).
- //
- // When stats are collected manually, produced Z-index table is inherently coerced into the
- // schema of the original source Parquet base-file and therefore we have to similarly coerce newly
- // built Z-index table (built off Parquet footers) into the canonical index schema (built off the
- // original source file schema)
- assertEquals(asJson(sort(manualZIndexTableDf)), asJson(sort(newZIndexTableDf)))
-
- // Match against expected Z-index table
- val expectedZIndexTableDf =
- spark.read
- .schema(indexSchema)
- .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
+ val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath)
- assertEquals(asJson(sort(expectedZIndexTableDf)), asJson(sort(replace(newZIndexTableDf))))
- }
+ inputDF
+ .sort("c1")
+ .repartition(4, new Column("c1"))
+ .write
+ .format("hudi")
+ .options(opts)
+ .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
- @Test
- def testZIndexTableMerge(): Unit = {
- val testZIndexPath = new Path(basePath, "zindex")
-
- val firstParquetTablePath = tempDir.resolve("index/zorder/input-table").toAbsolutePath.toString
- val firstJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString
-
- // Bootstrap FIRST source Parquet table
- bootstrapParquetInputTableFromJSON(firstJSONTablePath, firstParquetTablePath)
-
- val zorderedCols = Seq("c1", "c2", "c3", "c5", "c6", "c7", "c8")
- val indexSchema =
- ColumnStatsIndexHelper.composeIndexSchema(
- sourceTableSchema.fields.filter(f => zorderedCols.contains(f.name)).toSeq.asJava
- )
-
- //
- // Bootstrap Z-index table
- //
-
- val firstCommitInstance = "0"
- val firstInputDf = spark.read.parquet(firstParquetTablePath)
-
-// ColumnStatsIndexHelper.updateColumnStatsIndexFor(
-// firstInputDf.sparkSession,
-// sourceTableSchema,
-// firstInputDf.inputFiles.toSeq.asJava,
-// zorderedCols.asJava,
-// testZIndexPath.toString,
-// firstCommitInstance,
-// Seq().asJava
-// )
-
- // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
- // to reliably retrieve it
- val initialZIndexTable =
- spark.read
- .parquet(new Path(testZIndexPath, firstCommitInstance).toString)
-
- val expectedInitialZIndexTableDf =
- spark.read
- .schema(indexSchema)
- .json(getClass.getClassLoader.getResource("index/zorder/z-index-table.json").toString)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
- assertEquals(asJson(sort(expectedInitialZIndexTableDf)), asJson(sort(replace(initialZIndexTable))))
+ val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath)
- // Bootstrap SECOND source Parquet table
- val secondParquetTablePath = tempDir.resolve("index/zorder/another-input-table").toAbsolutePath.toString
- val secondJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString
+ val colStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+ val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
- bootstrapParquetInputTableFromJSON(secondJSONTablePath, secondParquetTablePath)
+ val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
- val secondCommitInstance = "1"
- val secondInputDf =
+ // Match against expected column stats table
+ val expectedColStatsIndexTableDf =
spark.read
- .schema(sourceTableSchema)
- .parquet(secondParquetTablePath)
-
- //
- // Update Column Stats table
- //
-
-// ColumnStatsIndexHelper.updateColumnStatsIndexFor(
-// secondInputDf.sparkSession,
-// sourceTableSchema,
-// secondInputDf.inputFiles.toSeq.asJava,
-// zorderedCols.asJava,
-// testZIndexPath.toString,
-// secondCommitInstance,
-// Seq(firstCommitInstance).asJava
-// )
-
- // NOTE: We don't need to provide schema upon reading from Parquet, since Spark will be able
- // to reliably retrieve it
- val mergedZIndexTable =
- spark.read
- .parquet(new Path(testZIndexPath, secondCommitInstance).toString)
-
- val expectedMergedZIndexTableDf =
+ .schema(expectedColStatsSchema)
+ .json(getClass.getClassLoader.getResource("index/zorder/column-stats-index-table.json").toString)
+
+ assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
+ // NOTE: We have to drop the `fileName` column as it contains semi-random components
+ // that we can't control in this test. Nevertheless, since we manually verify composition of the
+ // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
+ assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName"))))
+
+ // Collect Column Stats manually (reading individual Parquet files)
+ val manualColStatsTableDF =
+ buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+
+ assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF)))
+
+ // do an upsert and validate
+ val updateJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString
+ val updateDF = spark.read
+ .schema(sourceTableSchema)
+ .json(updateJSONTablePath)
+
+ updateDF.repartition(4)
+ .write
+ .format("hudi")
+ .options(opts)
+ .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024)
+ .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
+ val updatedColStatsDF = readColumnStatsIndex(spark, metadataTablePath)
+ val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
+
+ val expectedColStatsIndexUpdatedDF =
spark.read
- .schema(indexSchema)
- .json(getClass.getClassLoader.getResource("index/zorder/z-index-table-merged.json").toString)
+ .schema(expectedColStatsSchema)
+ .json(getClass.getClassLoader.getResource("index/zorder/updated-column-stats-index-table.json").toString)
- assertEquals(asJson(sort(expectedMergedZIndexTableDf)), asJson(sort(replace(mergedZIndexTable))))
+ assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
+ assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+
+ // Collect Column Stats manually (reading individual Parquet files)
+ val manualUpdatedColStatsTableDF =
+ buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+
+ assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
}
@Test
@@ -249,7 +210,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
while (it.hasNext) {
seq = seq :+ it.next()
}
- seq
+ seq.filter(fs => fs.getPath.getName.endsWith(".parquet"))
}
spark.createDataFrame(
@@ -296,23 +257,6 @@ class TestColumnStatsIndex extends HoodieClientTestBase {
fs.delete(new Path(targetParquetTablePath, "_SUCCESS"), false)
}
- def replace(ds: Dataset[Row]): DataFrame = {
- val uuidRegexp = "[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}"
-
- val uuids =
- ds.selectExpr(s"regexp_extract(file, '(${uuidRegexp})')")
- .distinct()
- .collect()
- .map(_.getString(0))
-
- val uuidToIdx: UserDefinedFunction = functions.udf((fileName: String) => {
- val uuid = uuids.find(uuid => fileName.contains(uuid)).get
- fileName.replace(uuid, "xxx")
- })
-
- ds.withColumn("file", uuidToIdx(ds("file")))
- }
-
private def generateRandomDataFrame(spark: SparkSession): DataFrame = {
val sourceTableSchema =
new StructType()
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index 918202e..ead6358 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -51,6 +51,8 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
val opts: Map[String, String] = commonOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS_FOR_ALL_COLUMNS.key -> "true",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
)
@@ -74,22 +76,29 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
.mode(SaveMode.Append)
.save(basePath)
- val metadataDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata")
+ // Files partition of MT
+ val filesPartitionDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
// Smoke test
- metadataDF.show()
+ filesPartitionDF.show()
// Query w/ 0 requested columns should be working fine
- assertEquals(4, metadataDF.count())
+ assertEquals(4, filesPartitionDF.count())
val expectedKeys = Seq("2015/03/16", "2015/03/17", "2016/03/15", "__all_partitions__")
- val keys = metadataDF.select("key")
+ val keys = filesPartitionDF.select("key")
.collect()
.map(_.getString(0))
.toSeq
.sorted
assertEquals(expectedKeys, keys)
+
+ // Column Stats Index partition of MT
+ val colStatsDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
+
+ // Smoke test
+ colStatsDF.show()
}
private def parseRecords(records: Seq[String]) = {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index fff92bc..2cdd788 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -27,10 +27,9 @@ import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
-import org.apache.spark.HoodieUnsafeRDDUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode}
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Tag, Test}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
index d6a2453..d84fad4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchmark.scala
@@ -19,8 +19,8 @@
package org.apache.spark.sql.execution.benchmark
import org.apache.hadoop.fs.Path
+import org.apache.hudi.ColumnStatsIndexHelper.buildColumnStatsTableFor
import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy
-import org.apache.hudi.index.columnstats.ColumnStatsIndexHelper
import org.apache.hudi.sort.SpaceCurveSortingHelper
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hudi.TestHoodieSqlBase
@@ -38,7 +38,7 @@ object SpaceCurveOptimizeBenchmark extends TestHoodieSqlBase {
val orderedColsTypes = Seq(StructField(co1, IntegerType), StructField(co2, IntegerType))
val colStatsIndexTable =
- ColumnStatsIndexHelper.buildColumnStatsTableFor(spark, sourceTableDF.inputFiles.toSeq, orderedColsTypes)
+ buildColumnStatsTableFor(spark, sourceTableDF.inputFiles.toSeq, orderedColsTypes)
.collect()
.map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5)))
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
index 3e23335..f81ff74 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import org.apache.spark.HoodieSparkTypeUtils.isCastPreservingOrdering
+import HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
object HoodieSpark2CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 31deb34..2673088 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -178,6 +178,11 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
val numFields = st.length
(getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
+ case (st: StructType, UNION) =>
+ val unionConverter = newUnionConverter(st, avroType)
+ val numFields = st.length
+ (getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))
+
case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
val valueConverter = newConverter(
vt, resolveNullableType(avroType.getValueType, valueContainsNull))
@@ -205,8 +210,7 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
}
}
- private def newStructConverter(
- catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
+ private def newStructConverter(catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
if (avroStruct.getType != RECORD || avroStruct.getFields.size() != catalystStruct.length) {
throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
s"Avro type $avroStruct.")
@@ -229,14 +233,58 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
result
}
+ private def newUnionConverter(catalystStruct: StructType, avroUnion: Schema): InternalRow => Any = {
+ if (avroUnion.getType != UNION || !canMapUnion(catalystStruct, avroUnion)) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
+ s"Avro type $avroUnion.")
+ }
+ val nullable = avroUnion.getTypes.size() > 0 && avroUnion.getTypes.get(0).getType == Type.NULL
+ val avroInnerTypes = if (nullable) {
+ avroUnion.getTypes.asScala.tail
+ } else {
+ avroUnion.getTypes.asScala
+ }
+ val fieldConverters = catalystStruct.zip(avroInnerTypes).map {
+ case (f1, f2) => newConverter(f1.dataType, f2)
+ }
+ val numFields = catalystStruct.length
+ (row: InternalRow) =>
+ var i = 0
+ var result: Any = null
+ while (i < numFields) {
+ if (!row.isNullAt(i)) {
+ if (result != null) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+ s"Avro union $avroUnion. Record has more than one optional values set")
+ }
+ result = fieldConverters(i).apply(row, i)
+ }
+ i += 1
+ }
+ if (!nullable && result == null) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+ s"Avro union $avroUnion. Record has no values set, while should have exactly one")
+ }
+ result
+ }
+
+ private def canMapUnion(catalystStruct: StructType, avroStruct: Schema): Boolean = {
+ (avroStruct.getTypes.size() > 0 &&
+ avroStruct.getTypes.get(0).getType == Type.NULL &&
+ avroStruct.getTypes.size() - 1 == catalystStruct.length) || avroStruct.getTypes.size() == catalystStruct.length
+ }
+
private def resolveNullableType(avroType: Schema, nullable: Boolean): Schema = {
if (nullable && avroType.getType != NULL) {
- // avro uses union to represent nullable type.
+ // Avro uses union to represent nullable type.
val fields = avroType.getTypes.asScala
- assert(fields.length == 2)
val actualType = fields.filter(_.getType != Type.NULL)
- assert(actualType.length == 1)
- actualType.head
+ if (fields.length == 2 && actualType.length == 1) {
+ actualType.head
+ } else {
+ // This is just a normal union, not used to designate nullability
+ avroType
+ }
} else {
avroType
}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala
index cb9c31f..3e65123 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark3_1CatalystExpressionUtils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql
-import org.apache.spark.HoodieSparkTypeUtils.isCastPreservingOrdering
+import HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
object HoodieSpark3_1CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index b423f9b..36d86c1 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -196,6 +196,11 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
val numFields = st.length
(getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
+ case (st: StructType, UNION) =>
+ val unionConverter = newUnionConverter(st, avroType)
+ val numFields = st.length
+ (getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))
+
case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
val valueConverter = newConverter(
vt, resolveNullableType(avroType.getValueType, valueContainsNull))
@@ -223,8 +228,7 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
}
}
- private def newStructConverter(
- catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
+ private def newStructConverter(catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = {
if (avroStruct.getType != RECORD || avroStruct.getFields.size() != catalystStruct.length) {
throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
s"Avro type $avroStruct.")
@@ -258,6 +262,47 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
result
}
+ private def newUnionConverter(catalystStruct: StructType, avroUnion: Schema): InternalRow => Any = {
+ if (avroUnion.getType != UNION || !canMapUnion(catalystStruct, avroUnion)) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
+ s"Avro type $avroUnion.")
+ }
+ val nullable = avroUnion.getTypes.size() > 0 && avroUnion.getTypes.get(0).getType == Type.NULL
+ val avroInnerTypes = if (nullable) {
+ avroUnion.getTypes.asScala.tail
+ } else {
+ avroUnion.getTypes.asScala
+ }
+ val fieldConverters = catalystStruct.zip(avroInnerTypes).map {
+ case (f1, f2) => newConverter(f1.dataType, f2)
+ }
+ val numFields = catalystStruct.length
+ (row: InternalRow) =>
+ var i = 0
+ var result: Any = null
+ while (i < numFields) {
+ if (!row.isNullAt(i)) {
+ if (result != null) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+ s"Avro union $avroUnion. Record has more than one optional values set")
+ }
+ result = fieldConverters(i).apply(row, i)
+ }
+ i += 1
+ }
+ if (!nullable && result == null) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+ s"Avro union $avroUnion. Record has no values set, while should have exactly one")
+ }
+ result
+ }
+
+ private def canMapUnion(catalystStruct: StructType, avroStruct: Schema): Boolean = {
+ (avroStruct.getTypes.size() > 0 &&
+ avroStruct.getTypes.get(0).getType == Type.NULL &&
+ avroStruct.getTypes.size() - 1 == catalystStruct.length) || avroStruct.getTypes.size() == catalystStruct.length
+ }
+
/**
* Resolve a possibly nullable Avro Type.
*
@@ -285,12 +330,12 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
if (avroType.getType == Type.UNION) {
val fields = avroType.getTypes.asScala
val actualType = fields.filter(_.getType != Type.NULL)
- if (fields.length != 2 || actualType.length != 1) {
- throw new UnsupportedAvroTypeException(
- s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
- "type is supported")
+ if (fields.length == 2 && actualType.length == 1) {
+ (true, actualType.head)
+ } else {
+ // This is just a normal union, not used to designate nullability
+ (false, avroType)
}
- (true, actualType.head)
} else {
(false, avroType)
}
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala
index 8e056c0..fc8c957 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/HoodieSpark3_2CatalystExpressionUtils.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import org.apache.spark.HoodieSparkTypeUtils.isCastPreservingOrdering
+import HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
object HoodieSpark3_2CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 2fe51d3..73267f4 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -211,6 +211,11 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
val numFields = st.length
(getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
+ case (st: StructType, UNION) =>
+ val unionConverter = newUnionConverter(st, avroType, catalystPath, avroPath)
+ val numFields = st.length
+ (getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))
+
case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
val valueConverter = newConverter(
vt, resolveNullableType(avroType.getValueType, valueContainsNull),
@@ -288,6 +293,50 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
result
}
+ private def newUnionConverter(catalystStruct: StructType,
+ avroUnion: Schema,
+ catalystPath: Seq[String],
+ avroPath: Seq[String]): InternalRow => Any = {
+ if (avroUnion.getType != UNION || !canMapUnion(catalystStruct, avroUnion)) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " +
+ s"Avro type $avroUnion.")
+ }
+ val nullable = avroUnion.getTypes.size() > 0 && avroUnion.getTypes.get(0).getType == Type.NULL
+ val avroInnerTypes = if (nullable) {
+ avroUnion.getTypes.asScala.tail
+ } else {
+ avroUnion.getTypes.asScala
+ }
+ val fieldConverters = catalystStruct.zip(avroInnerTypes).map {
+ case (f1, f2) => newConverter(f1.dataType, f2, catalystPath, avroPath)
+ }
+ val numFields = catalystStruct.length
+ (row: InternalRow) =>
+ var i = 0
+ var result: Any = null
+ while (i < numFields) {
+ if (!row.isNullAt(i)) {
+ if (result != null) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+ s"Avro union $avroUnion. Record has more than one optional values set")
+ }
+ result = fieldConverters(i).apply(row, i)
+ }
+ i += 1
+ }
+ if (!nullable && result == null) {
+ throw new IncompatibleSchemaException(s"Cannot convert Catalyst record $catalystStruct to " +
+ s"Avro union $avroUnion. Record has no values set, while should have exactly one")
+ }
+ result
+ }
+
+ private def canMapUnion(catalystStruct: StructType, avroStruct: Schema): Boolean = {
+ (avroStruct.getTypes.size() > 0 &&
+ avroStruct.getTypes.get(0).getType == Type.NULL &&
+ avroStruct.getTypes.size() - 1 == catalystStruct.length) || avroStruct.getTypes.size() == catalystStruct.length
+ }
+
/**
* Resolve a possibly nullable Avro Type.
*
@@ -315,12 +364,12 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
if (avroType.getType == Type.UNION) {
val fields = avroType.getTypes.asScala
val actualType = fields.filter(_.getType != Type.NULL)
- if (fields.length != 2 || actualType.length != 1) {
- throw new UnsupportedAvroTypeException(
- s"Unsupported Avro UNION type $avroType: Only UNION of a null type and a non-null " +
- "type is supported")
+ if (fields.length == 2 && actualType.length == 1) {
+ (true, actualType.head)
+ } else {
+ // This is just a normal union, not used to designate nullability
+ (false, avroType)
}
- (true, actualType.head)
} else {
(false, avroType)
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5a11570..2f88809 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -51,6 +51,7 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.utilities.util.BloomFilterData;
import com.beust.jcommander.JCommander;
@@ -660,6 +661,7 @@ public class HoodieMetadataTableValidator implements Serializable {
}).collect(Collectors.toList());
}
+ @SuppressWarnings("rawtypes")
private void validateAllColumnStats(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext,
@@ -667,9 +669,9 @@ public class HoodieMetadataTableValidator implements Serializable {
Set<String> baseDataFilesForCleaning) {
List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
- List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
+ List<HoodieColumnRangeMetadata<Comparable>> metadataBasedColStats = metadataTableBasedContext
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
- List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
+ List<HoodieColumnRangeMetadata<Comparable>> fsBasedColStats = fsBasedContext
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
validate(metadataBasedColStats, fsBasedColStats, partitionPath, "column stats");
@@ -777,10 +779,10 @@ public class HoodieMetadataTableValidator implements Serializable {
}
public static class HoodieColumnRangeMetadataComparator
- implements Comparator<HoodieColumnRangeMetadata<String>>, Serializable {
+ implements Comparator<HoodieColumnRangeMetadata<Comparable>>, Serializable {
@Override
- public int compare(HoodieColumnRangeMetadata<String> o1, HoodieColumnRangeMetadata<String> o2) {
+ public int compare(HoodieColumnRangeMetadata<Comparable> o1, HoodieColumnRangeMetadata<Comparable> o2) {
return o1.toString().compareTo(o2.toString());
}
}
@@ -837,7 +839,8 @@ public class HoodieMetadataTableValidator implements Serializable {
.sorted(new HoodieFileGroupComparator()).collect(Collectors.toList());
}
- public List<HoodieColumnRangeMetadata<String>> getSortedColumnStatsList(
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public List<HoodieColumnRangeMetadata<Comparable>> getSortedColumnStatsList(
String partitionPath, List<String> baseFileNameList) {
LOG.info("All column names for getting column stats: " + allColumnNameList);
if (enableMetadataTable) {
@@ -846,15 +849,7 @@ public class HoodieMetadataTableValidator implements Serializable {
return allColumnNameList.stream()
.flatMap(columnName ->
tableMetadata.getColumnStats(partitionFileNameList, columnName).values().stream()
- .map(stats -> HoodieColumnRangeMetadata.create(
- stats.getFileName(),
- columnName,
- stats.getMinValue(),
- stats.getMaxValue(),
- stats.getNullCount(),
- stats.getValueCount(),
- stats.getTotalSize(),
- stats.getTotalUncompressedSize()))
+ .map(HoodieTableMetadataUtil::convertColumnStatsRecordToColumnRangeMetadata)
.collect(Collectors.toList())
.stream())
.sorted(new HoodieColumnRangeMetadataComparator())
@@ -865,18 +860,6 @@ public class HoodieMetadataTableValidator implements Serializable {
metaClient.getHadoopConf(),
new Path(new Path(metaClient.getBasePath(), partitionPath), filename),
allColumnNameList).stream())
- .map(rangeMetadata -> HoodieColumnRangeMetadata.create(
- rangeMetadata.getFilePath(),
- rangeMetadata.getColumnName(),
- // Note: here we ignore the type in the validation,
- // since column stats from metadata table store the min/max values as String
- rangeMetadata.getMinValue().toString(),
- rangeMetadata.getMaxValue().toString(),
- rangeMetadata.getNullCount(),
- rangeMetadata.getValueCount(),
- rangeMetadata.getTotalSize(),
- rangeMetadata.getTotalUncompressedSize()
- ))
.sorted(new HoodieColumnRangeMetadataComparator())
.collect(Collectors.toList());
}