You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/07/25 22:36:19 UTC
[hudi] branch master updated: [HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping (#5746)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e7c8df7e8b [HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping (#5746)
e7c8df7e8b is described below
commit e7c8df7e8b886261a9b0da7696a75e73eca04f11
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Mon Jul 25 15:36:12 2022 -0700
[HUDI-4250][HUDI-4202] Optimize performance of Column Stats Index reading in Data Skipping (#5746)
We provide an alternative way of fetching Column Stats Index within the reading process to avoid the penalty of a more heavy-weight execution scheduled through a Spark engine.
---
.../scala/org/apache/hudi/util/JFunction.scala | 32 +-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 9 +
.../functional/TestHoodieBackedMetadata.java | 6 +-
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 12 +-
.../hudi/common/config/HoodieMetadataConfig.java | 28 ++
.../metadata/FileSystemBackedTableMetadata.java | 2 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 82 ++--
.../apache/hudi/metadata/HoodieTableMetadata.java | 3 +-
.../hudi/source/stats/ColumnStatsIndices.java | 2 +-
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 481 ++++++++++++++-------
.../org/apache/hudi/HoodieCatalystUtils.scala | 65 +++
.../hudi/HoodieDatasetBulkInsertHelper.scala | 7 +-
.../scala/org/apache/hudi/HoodieDatasetUtils.scala | 45 --
.../scala/org/apache/hudi/HoodieFileIndex.scala | 100 ++---
...nsafeRDDUtils.scala => HoodieUnsafeUtils.scala} | 45 +-
.../org/apache/hudi/TestHoodieFileIndex.scala | 10 +-
.../hudi/functional/TestColumnStatsIndex.scala | 168 +++----
.../functional/TestParquetColumnProjection.scala | 4 +-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 22 +
.../spark/sql/adapter/BaseSpark3Adapter.scala | 22 +
20 files changed, 744 insertions(+), 401 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
index 4a7dca8408..8a612f4da2 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/JFunction.scala
@@ -17,15 +17,43 @@
package org.apache.hudi.util
+import org.apache.hudi.common.function.{SerializableFunction, SerializablePairFunction}
+import org.apache.hudi.common.util.collection
+
+import scala.language.implicitConversions
+
/**
* Utility allowing for seamless conversion b/w Java/Scala functional primitives
*/
object JFunction {
- def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
+ ////////////////////////////////////////////////////////////
+ // From Java to Scala
+ ////////////////////////////////////////////////////////////
+
+ implicit def toScala[T, R](f: java.util.function.Function[T, R]): T => R =
(t: T) => f.apply(t)
- def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
+ ////////////////////////////////////////////////////////////
+ // From Scala to Java
+ ////////////////////////////////////////////////////////////
+
+ implicit def toJavaFunction[T, R](f: Function[T, R]): java.util.function.Function[T, R] =
+ new java.util.function.Function[T, R] {
+ override def apply(t: T): R = f.apply(t)
+ }
+
+ implicit def toJavaSerializableFunction[T, R](f: Function[T, R]): SerializableFunction[T, R] =
+ new SerializableFunction[T, R] {
+ override def apply(t: T): R = f.apply(t)
+ }
+
+ implicit def toJavaSerializablePairFunction[T, K, V](f: Function[T, collection.Pair[K, V]]): SerializablePairFunction[T, K, V] =
+ new SerializablePairFunction[T, K, V] {
+ override def call(t: T): collection.Pair[K, V] = f.apply(t)
+ }
+
+ implicit def toJava[T](f: T => Unit): java.util.function.Consumer[T] =
new java.util.function.Consumer[T] {
override def accept(t: T): Unit = f.apply(t)
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index cd30528798..24f4e6117a 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -27,12 +27,16 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{FilePartition, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
import java.util.Locale
@@ -138,4 +142,9 @@ trait SparkAdapter extends Serializable {
* TODO move to HoodieCatalystExpressionUtils
*/
def createInterpretedPredicate(e: Expression): InterpretedPredicate
+
+ /**
+ * Converts instance of [[StorageLevel]] to a corresponding string
+ */
+ def convertStorageLevelToString(level: StorageLevel): String
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index e10c372be6..8828ceab6d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1504,7 +1504,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// prefix search for column (_hoodie_record_key)
ColumnIndexID columnIndexID = new ColumnIndexID(HoodieRecord.RECORD_KEY_METADATA_FIELD);
List<HoodieRecord<HoodieMetadataPayload>> result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString()),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
// there are 3 partitions in total and 2 commits. total entries should be 6.
assertEquals(result.size(), 6);
@@ -1515,7 +1515,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// prefix search for col(_hoodie_record_key) and first partition. only 2 files should be matched
PartitionIndexID partitionIndexID = new PartitionIndexID(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
@@ -1534,7 +1534,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// prefix search for column {commit time} and first partition
columnIndexID = new ColumnIndexID(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
result = tableMetadata.getRecordsByKeyPrefixes(Collections.singletonList(columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString())),
- MetadataPartitionType.COLUMN_STATS.getPartitionPath()).collectAsList();
+ MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true).collectAsList();
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index ec70653b9c..0b2c34618e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -38,6 +38,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hadoop.CachingPath;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -62,7 +63,7 @@ import java.util.stream.Collectors;
* <li>Query instant/range</li>
* </ul>
*/
-public abstract class BaseHoodieTableFileIndex {
+public abstract class BaseHoodieTableFileIndex {
private static final Logger LOG = LogManager.getLogger(BaseHoodieTableFileIndex.class);
@@ -166,6 +167,11 @@ public abstract class BaseHoodieTableFileIndex {
.collect(Collectors.toMap(e -> e.getKey().path, Map.Entry::getValue));
}
+ public int getFileSlicesCount() {
+ return cachedAllInputFileSlices.values().stream()
+ .mapToInt(List::size).sum();
+ }
+
protected List<PartitionPath> getAllQueryPartitionPaths() {
List<String> queryRelativePartitionPaths = queryPaths.stream()
.map(path -> FSUtils.getRelativePartitionPath(new Path(basePath), path))
@@ -349,10 +355,10 @@ public abstract class BaseHoodieTableFileIndex {
Path fullPartitionPath(String basePath) {
if (!path.isEmpty()) {
- return new Path(basePath, path);
+ return new CachingPath(basePath, path);
}
- return new Path(basePath);
+ return new CachingPath(basePath);
}
@Override
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 2cd08b9ae9..b16373ef83 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -187,6 +187,26 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("Comma-separated list of columns for which column stats index will be built. If not set, all columns will be indexed");
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY = "in-memory";
+ public static final String COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE = "engine";
+
+ public static final ConfigProperty<String> COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE = ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.processing.mode.override")
+ .noDefaultValue()
+ .withValidValues(COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY, COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE)
+ .sinceVersion("0.12.0")
+ .withDocumentation("By default Column Stats Index is automatically determining whether it should be read and processed either"
+ + "'in-memory' (w/in executing process) or using Spark (on a cluster), based on some factors like the size of the Index "
+ + "and how many columns are read. This config allows to override this behavior.");
+
+ public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD = ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.inMemory.projection.threshold")
+ .defaultValue(100000)
+ .sinceVersion("0.12.0")
+ .withDocumentation("When reading Column Stats Index, if the size of the expected resulting projection is below the in-memory"
+ + " threshold (counted by the # of rows), it will be attempted to be loaded \"in-memory\" (ie not using the execution engine"
+ + " like Spark, Flink, etc). If the value is above the threshold execution engine will be used to compose the projection.");
+
public static final ConfigProperty<String> BLOOM_FILTER_INDEX_FOR_COLUMNS = ConfigProperty
.key(METADATA_PREFIX + ".index.bloom.filter.column.list")
.noDefaultValue()
@@ -246,6 +266,14 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return StringUtils.split(getString(COLUMN_STATS_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
}
+ public String getColumnStatsIndexProcessingModeOverride() {
+ return getString(COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE);
+ }
+
+ public Integer getColumnStatsIndexInMemoryProjectionThreshold() {
+ return getIntOrDefault(COLUMN_STATS_INDEX_IN_MEMORY_PROJECTION_THRESHOLD);
+ }
+
public List<String> getColumnsEnabledForBloomFilterIndex() {
return StringUtils.split(getString(BLOOM_FILTER_INDEX_FOR_COLUMNS), CONFIG_VALUES_DELIMITER);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index f029995ba0..9877755b3c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -168,7 +168,7 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
}
@Override
- public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName) {
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes, String partitionName, boolean shouldLoadInMemory) {
throw new HoodieMetadataException("Unsupported operation: getRecordsByKeyPrefixes!");
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index e96889f044..f8a0389da3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
@@ -143,10 +144,11 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
@Override
public HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
- String partitionName) {
+ String partitionName,
+ boolean shouldLoadInMemory) {
// Sort the columns so that keys are looked up in order
- List<String> sortedkeyPrefixes = new ArrayList<>(keyPrefixes);
- Collections.sort(sortedkeyPrefixes);
+ List<String> sortedKeyPrefixes = new ArrayList<>(keyPrefixes);
+ Collections.sort(sortedKeyPrefixes);
// NOTE: Since we partition records to a particular file-group by full key, we will have
// to scan all file-groups for all key-prefixes as each of these might contain some
@@ -154,44 +156,44 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
List<FileSlice> partitionFileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
- return engineContext.parallelize(partitionFileSlices)
- .flatMap(
- (SerializableFunction<FileSlice, Iterator<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
- // NOTE: Since this will be executed by executors, we can't access previously cached
- // readers, and therefore have to always open new ones
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
- openReaders(partitionName, fileSlice);
- try {
- List<Long> timings = new ArrayList<>();
-
- HoodieFileReader baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
-
- if (baseFileReader == null && logRecordScanner == null) {
- // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
- return Collections.emptyIterator();
- }
-
- boolean fullKeys = false;
-
- Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
- readLogRecords(logRecordScanner, sortedkeyPrefixes, fullKeys, timings);
-
- List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
- readFromBaseAndMergeWithLogRecords(baseFileReader, sortedkeyPrefixes, fullKeys, logRecords, timings, partitionName);
-
- LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
- sortedkeyPrefixes.size(), timings));
-
- return mergedRecords.iterator();
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table for " + sortedkeyPrefixes.size() + " key : ", ioe);
- } finally {
- closeReader(readers);
- }
+ return (shouldLoadInMemory ? HoodieListData.lazy(partitionFileSlices) : engineContext.parallelize(partitionFileSlices))
+ .flatMap((SerializableFunction<FileSlice, Iterator<HoodieRecord<HoodieMetadataPayload>>>) fileSlice -> {
+ // NOTE: Since this will be executed by executors, we can't access previously cached
+ // readers, and therefore have to always open new ones
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
+ openReaders(partitionName, fileSlice);
+
+ try {
+ List<Long> timings = new ArrayList<>();
+
+ HoodieFileReader baseFileReader = readers.getKey();
+ HoodieMetadataMergedLogRecordReader logRecordScanner = readers.getRight();
+
+ if (baseFileReader == null && logRecordScanner == null) {
+ // TODO: what do we do if both does not exist? should we throw an exception and let caller do the fallback ?
+ return Collections.emptyIterator();
}
- )
- .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+
+ boolean fullKeys = false;
+
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
+ readLogRecords(logRecordScanner, sortedKeyPrefixes, fullKeys, timings);
+
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
+ readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeyPrefixes, fullKeys, logRecords, timings, partitionName);
+
+ LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, logMerge] %s ms",
+ sortedKeyPrefixes.size(), timings));
+
+ return mergedRecords.stream()
+ .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+ .iterator();
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table for " + sortedKeyPrefixes.size() + " key : ", ioe);
+ } finally {
+ closeReader(readers);
+ }
+ })
.filter(Objects::nonNull);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index a059b57845..ae871e3be0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -170,7 +170,8 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
* @return {@link HoodieData} of {@link HoodieRecord}s with records matching the passed in key prefixes.
*/
HoodieData<HoodieRecord<HoodieMetadataPayload>> getRecordsByKeyPrefixes(List<String> keyPrefixes,
- String partitionName);
+ String partitionName,
+ boolean shouldLoadInMemory);
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
index 8ec0eafde3..75e10341db 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
@@ -319,7 +319,7 @@ public class ColumnStatsIndices {
.map(colName -> new ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
- metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+ metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, false);
org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter converter =
AvroToRowDataConverters.createRowConverter((RowType) METADATA_DATA_TYPE.getLogicalType());
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index b1e03f86ff..58511f791e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -17,66 +17,153 @@
package org.apache.hudi
-import org.apache.avro.Schema.Parser
-import org.apache.avro.generic.GenericRecord
-import org.apache.hudi.ColumnStatsIndexSupport.{composeIndexSchema, deserialize, metadataRecordSchemaString, metadataRecordStructType, tryUnpackNonNullVal}
+import org.apache.avro.Conversions.DecimalConversion
+import org.apache.avro.generic.GenericData
+import org.apache.hudi.ColumnStatsIndexSupport._
+import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, withPersistedDataset}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.model.HoodieMetadataRecord
+import org.apache.hudi.avro.model._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
+import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, createDataFrameFromRDD, createDataFrameFromRows}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, HoodieUnsafeRDDUtils, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.immutable.TreeSet
+import scala.collection.mutable.ListBuffer
+import scala.collection.parallel.mutable.ParHashMap
+
+class ColumnStatsIndexSupport(spark: SparkSession,
+ tableSchema: StructType,
+ @transient metadataConfig: HoodieMetadataConfig,
+ @transient metaClient: HoodieTableMetaClient,
+ allowCaching: Boolean = false) {
+
+ @transient private lazy val engineCtx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
+ @transient private lazy val metadataTable: HoodieTableMetadata =
+ HoodieTableMetadata.create(engineCtx, metadataConfig, metaClient.getBasePathV2.toString, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
+
+ @transient private lazy val cachedColumnStatsIndexViews: ParHashMap[Seq[String], DataFrame] = ParHashMap()
+
+ // NOTE: Since [[metadataConfig]] is transient this has to be eagerly persisted, before this will be passed
+ // on to the executor
+ private val inMemoryProjectionThreshold = metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
+
+ private lazy val indexedColumns: Set[String] = {
+ val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
+ // Column Stats Index could index either
+ // - The whole table
+ // - Only configured columns
+ if (customIndexedColumns.isEmpty) {
+ tableSchema.fieldNames.toSet
+ } else {
+ customIndexedColumns.asScala.toSet
+ }
+ }
-/**
- * Mixin trait abstracting away heavy-lifting of interactions with Metadata Table's Column Stats Index,
- * providing convenient interfaces to read it, transpose, etc
- */
-trait ColumnStatsIndexSupport extends SparkAdapterSupport {
-
- def readColumnStatsIndex(spark: SparkSession,
- tableBasePath: String,
- metadataConfig: HoodieMetadataConfig,
- targetColumns: Seq[String] = Seq.empty): DataFrame = {
- val targetColStatsIndexColumns = Seq(
- HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
- val requiredMetadataIndexColumns =
- (targetColStatsIndexColumns :+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME).map(colName =>
- s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}")
-
- val metadataTableDF: DataFrame = {
- // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
- // by only fetching Column Stats Index records pertaining to the requested columns.
- // Otherwise we fallback to read whole Column Stats Index
- if (targetColumns.nonEmpty) {
- readColumnStatsIndexForColumnsInternal(spark, targetColumns, metadataConfig, tableBasePath)
- } else {
- readFullColumnStatsIndexInternal(spark, metadataConfig, tableBasePath)
- }
+ /**
+ * Returns true in cases when Column Stats Index is built and available as standalone partition
+ * w/in the Metadata Table
+ */
+ def isIndexAvailable: Boolean = {
+ checkState(metadataConfig.enabled, "Metadata Table support has to be enabled")
+ metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+ }
+
+ /**
+ * Determines whether it would be more optimal to read Column Stats Index a) in-memory of the invoking process,
+ * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
+ */
+ def shouldReadInMemory(fileIndex: HoodieFileIndex, queryReferencedColumns: Seq[String]): Boolean = {
+ Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
+ case Some(mode) =>
+ mode == HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
+ case None =>
+ fileIndex.getFileSlicesCount * queryReferencedColumns.length < inMemoryProjectionThreshold
+ }
+ }
+
+ /**
+ * Loads view of the Column Stats Index in a transposed format where single row coalesces every columns'
+ * statistics for a single file, returning it as [[DataFrame]]
+ *
+ * Please check out scala-doc of the [[transpose]] method explaining this view in more details
+ */
+ def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: Boolean)(block: DataFrame => T): T = {
+ cachedColumnStatsIndexViews.get(targetColumns) match {
+ case Some(cachedDF) =>
+ block(cachedDF)
+
+ case None =>
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+ loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+
+ withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
+ val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns)
+ val df = if (shouldReadInMemory) {
+ // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
+ // of the transposed table in memory, facilitating execution of the subsequently chained operations
+ // on it locally (on the driver; all such operations are actually going to be performed by Spark's
+ // Optimizer)
+ createDataFrameFromRows(spark, transposedRows.collectAsList().asScala, indexSchema)
+ } else {
+ val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
+ spark.createDataFrame(rdd, indexSchema)
+ }
+
+ if (allowCaching) {
+ cachedColumnStatsIndexViews.put(targetColumns, df)
+ // NOTE: Instead of collecting the rows from the index and hold them in memory, we instead rely
+ // on Spark as (potentially distributed) cache managing data lifecycle, while we simply keep
+ // the referenced to persisted [[DataFrame]] instance
+ df.persist(StorageLevel.MEMORY_ONLY)
+
+ block(df)
+ } else {
+ withPersistedDataset(df) {
+ block(df)
+ }
+ }
+ }
}
+ }
- val colStatsDF = metadataTableDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
- .select(requiredMetadataIndexColumns.map(col): _*)
+ /**
+ * Loads a view of the Column Stats Index in a raw format, returning it as [[DataFrame]]
+ *
+ * Please check out scala-doc of the [[transpose]] method explaining this view in more details
+ */
+ def load(targetColumns: Seq[String] = Seq.empty, shouldReadInMemory: Boolean = false): DataFrame = {
+ // NOTE: If specific columns have been provided, we can considerably trim down amount of data fetched
+ // by only fetching Column Stats Index records pertaining to the requested columns.
+ // Otherwise we fallback to read whole Column Stats Index
+ if (targetColumns.nonEmpty) {
+ loadColumnStatsIndexForColumnsInternal(targetColumns, shouldReadInMemory)
+ } else {
+ loadFullColumnStatsIndexInternal()
+ }
+ }
- colStatsDF
+ def invalidateCaches(): Unit = {
+ cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
+ cachedColumnStatsIndexViews.clear()
}
/**
@@ -112,154 +199,184 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
* column references from the filtering expressions, and only transpose records corresponding to the
* columns referenced in those
*
- * @param spark Spark session ref
- * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
+ * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing raw Column Stats Index records
* @param queryColumns target columns to be included into the final table
- * @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
- val colStatsSchema = colStatsDF.schema
- val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
- case (field, ordinal) => (field.name, ordinal)
- }).toMap
-
+ private def transpose(colStatsRecords: HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String]): (HoodieData[Row], StructType) = {
val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
-
- val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
- val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
- val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
- val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
- val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)
-
- // NOTE: We have to collect list of indexed columns to make sure we properly align the rows
- // w/in the transposed dataset: since some files might not have all of the columns indexed
- // either due to the Column Stats Index config changes, schema evolution, etc, we have
- // to make sure that all of the rows w/in transposed data-frame are properly padded (with null
- // values) for such file-column combinations
- val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()
-
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
- val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*)
-
- val transposedRDD = colStatsDF.rdd
- .filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal)))
- .map { row =>
- if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
+ val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+ val sortedTargetColumns = sortedTargetColumnsSet.toSeq
+
+ // NOTE: This is a trick to avoid pulling all of [[ColumnStatsIndexSupport]] object into the lambdas'
+ // closures below
+ val indexedColumns = this.indexedColumns
+
+ // Here we perform complex transformation which requires us to modify the layout of the rows
+ // of the dataset, and therefore we rely on low-level RDD API to avoid incurring encoding/decoding
+ // penalty of the [[Dataset]], since it's required to adhere to its schema at all times, while
+ // RDDs are not;
+ val transposedRows: HoodieData[Row] = colStatsRecords
+ // NOTE: Explicit conversion is required for Scala 2.11
+ .filter(JFunction.toJavaSerializableFunction(r => sortedTargetColumnsSet.contains(r.getColumnName)))
+ .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+ if (r.getMinValue == null && r.getMaxValue == null) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
- row
+ collection.Pair.of(r.getFileName, r)
} else {
- val minValueStruct = row.getAs[Row](minValueOrdinal)
- val maxValueStruct = row.getAs[Row](maxValueOrdinal)
+ val minValueWrapper = r.getMinValue
+ val maxValueWrapper = r.getMaxValue
- checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
+ checkState(minValueWrapper != null && maxValueWrapper != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")
- val colName = row.getString(colNameOrdinal)
+ val colName = r.getColumnName
val colType = tableSchemaFieldMap(colName).dataType
- val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
- val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
- val rowValsSeq = row.toSeq.toArray
+ val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), colType)
+ val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), colType)
+
// Update min-/max-value structs w/ unwrapped values in-place
- rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
- rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
+ r.setMinValue(minValue)
+ r.setMaxValue(maxValue)
- Row(rowValsSeq: _*)
+ collection.Pair.of(r.getFileName, r)
}
- }
- .groupBy(r => r.getString(fileNameOrdinal))
- .foldByKey(Seq[Row]()) {
- case (_, columnRowsSeq) =>
- // Rows seq is always non-empty (otherwise it won't be grouped into)
- val fileName = columnRowsSeq.head.get(fileNameOrdinal)
- val valueCount = columnRowsSeq.head.get(valueCountOrdinal)
-
- // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
- // to align existing column-stats for individual file with the list of expected ones for the
- // whole transposed projection (a superset of all files)
- val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap
- val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get)
-
- val coalescedRowValuesSeq =
- alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
- case (acc, opt) =>
- opt match {
- case Some(columnStatsRow) =>
- acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord))
- case None =>
- // NOTE: Since we're assuming missing column to essentially contain exclusively
- // null values, we set null-count to be equal to value-count (this behavior is
- // consistent with reading non-existent columns from Parquet)
- acc ++ Seq(null, null, valueCount)
- }
- }
-
- Seq(Row(coalescedRowValuesSeq:_*))
- }
- .values
- .flatMap(it => it)
+ }))
+ .groupByKey()
+ .map(JFunction.toJavaSerializableFunction(p => {
+ val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = p.getValue.asScala.toSeq
+ val fileName: String = p.getKey
+ val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+ // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
+ // to align existing column-stats for individual file with the list of expected ones for the
+ // whole transposed projection (a superset of all files)
+ val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, r)).toMap
+ val alignedColStatRecordsSeq = sortedTargetColumns.map(columnRecordsMap.get)
+
+ val coalescedRowValuesSeq =
+ alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, valueCount)) {
+ case (acc, opt) =>
+ opt match {
+ case Some(colStatRecord) =>
+ acc ++= Seq(colStatRecord.getMinValue, colStatRecord.getMaxValue, colStatRecord.getNullCount)
+ case None =>
+ // NOTE: This could occur in either of the following cases:
+ // 1. Column is not indexed in Column Stats Index: in this case we won't be returning
+ // any statistics for such column (ie all stats will be null)
+ // 2. Particular file does not have this particular column (which is indexed by Column Stats Index):
+ // in this case we're assuming missing column to essentially contain exclusively
+ // null values, we set min/max values as null and null-count to be equal to value-count (this
+ // behavior is consistent with reading non-existent columns from Parquet)
+ //
+ // This is a way to determine current column's index without explicit iteration (we're adding 3 stats / column)
+ val idx = acc.length / 3
+ val colName = sortedTargetColumns(idx)
+ val indexed = indexedColumns.contains(colName)
+
+ val nullCount = if (indexed) valueCount else null
+
+ acc ++= Seq(null, null, nullCount)
+ }
+ }
+
+ Row(coalescedRowValuesSeq:_*)
+ }))
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the schema
- val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema)
+ val indexSchema = composeIndexSchema(sortedTargetColumns, tableSchema)
+ (transposedRows, indexSchema)
+ }
- spark.createDataFrame(transposedRDD, indexSchema)
+ private def loadColumnStatsIndexForColumnsInternal(targetColumns: Seq[String], shouldReadInMemory: Boolean): DataFrame = {
+ val colStatsDF = {
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+ // NOTE: Explicit conversion is required for Scala 2.11
+ val catalystRows: HoodieData[InternalRow] = colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+ val converter = AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$, columnStatsRecordStructType)
+ it.asScala.map(r => converter(r).orNull).asJava
+ }), false)
+
+ if (shouldReadInMemory) {
+ // NOTE: This will instantiate a [[Dataset]] backed by [[LocalRelation]] holding all of the rows
+ // of the transposed table in memory, facilitating execution of the subsequently chained operations
+ // on it locally (on the driver; all such operations are actually going to be performed by Spark's
+ // Optimizer)
+ createDataFrameFromInternalRows(spark, catalystRows.collectAsList().asScala, columnStatsRecordStructType)
+ } else {
+ createDataFrameFromRDD(spark, HoodieJavaRDD.getJavaRDD(catalystRows), columnStatsRecordStructType)
+ }
+ }
+
+ colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*)
}
- private def readFullColumnStatsIndexInternal(spark: SparkSession, metadataConfig: HoodieMetadataConfig, tableBasePath: String): DataFrame = {
- val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(tableBasePath)
+ private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
+ // Read Metadata Table's Column Stats Index records into [[HoodieData]] container by
+ // - Fetching the records from CSI by key-prefixes (encoded column names)
+ // - Extracting [[HoodieMetadataColumnStats]] records
+ // - Filtering out nulls
+ checkState(targetColumns.nonEmpty)
+
+ // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+ val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
+
+ val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
+ metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
+
+ val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+ // NOTE: Explicit conversion is required for Scala 2.11
+ metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
+ toScalaOption(record.getData.getInsertValue(null, null))
+ .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+ .orNull
+ }))
+ .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => columnStatsRecord != null))
+
+ columnStatsRecords
+ }
+
+ private def loadFullColumnStatsIndexInternal(): DataFrame = {
+ val metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePathV2.toString)
// Read Metadata Table's Column Stats Index into Spark's [[DataFrame]]
- spark.read.format("org.apache.hudi")
+ val colStatsDF = spark.read.format("org.apache.hudi")
.options(metadataConfig.getProps.asScala)
.load(s"$metadataTablePath/${MetadataPartitionType.COLUMN_STATS.getPartitionPath}")
- }
-
- private def readColumnStatsIndexForColumnsInternal(spark: SparkSession, targetColumns: Seq[String], metadataConfig: HoodieMetadataConfig, tableBasePath: String) = {
- val ctx = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
- // Read Metadata Table's Column Stats Index into Spark's [[DataFrame]] by
- // - Fetching the records from CSI by key-prefixes (encoded column names)
- // - Deserializing fetched records into [[InternalRow]]s
- // - Composing [[DataFrame]]
- val metadataTableDF = {
- val metadataTable = HoodieTableMetadata.create(ctx, metadataConfig, tableBasePath, FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue)
-
- // TODO encoding should be done internally w/in HoodieBackedTableMetadata
- val encodedTargetColumnNames = targetColumns.map(colName => new ColumnIndexID(colName).asBase64EncodedString())
-
- val recordsRDD: RDD[HoodieRecord[HoodieMetadataPayload]] =
- HoodieJavaRDD.getJavaRDD(
- metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames.asJava, HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
- )
-
- val catalystRowsRDD: RDD[InternalRow] = recordsRDD.mapPartitions { it =>
- val metadataRecordSchema = new Parser().parse(metadataRecordSchemaString)
- val converter = AvroConversionUtils.createAvroToInternalRowConverter(metadataRecordSchema, metadataRecordStructType)
-
- it.map { record =>
- // schema and props are ignored for generating metadata record from the payload
- // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used
- toScalaOption(record.getData.getInsertValue(null, null))
- .flatMap(avroRecord => converter(avroRecord.asInstanceOf[GenericRecord]))
- .orNull
- }
- }
+ val requiredIndexColumns =
+ targetColumnStatsIndexColumns.map(colName =>
+ col(s"${HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS}.${colName}"))
- HoodieUnsafeRDDUtils.createDataFrame(spark, catalystRowsRDD, metadataRecordStructType)
- }
- metadataTableDF
+ colStatsDF.where(col(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).isNotNull)
+ .select(requiredIndexColumns: _*)
}
}
object ColumnStatsIndexSupport {
- private val metadataRecordSchemaString: String = HoodieMetadataRecord.SCHEMA$.toString
- private val metadataRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataRecord.SCHEMA$)
+ private val expectedAvroSchemaValues = Set("BooleanWrapper", "IntWrapper", "LongWrapper", "FloatWrapper", "DoubleWrapper",
+ "BytesWrapper", "StringWrapper", "DateWrapper", "DecimalWrapper", "TimeMicrosWrapper", "TimestampMicrosWrapper")
+
+ /**
+ * Target Column Stats Index columns which internally are mapped onto fields of the correspoding
+ * Column Stats record payload ([[HoodieMetadataColumnStats]]) persisted w/in Metadata Table
+ */
+ private val targetColumnStatsIndexColumns = Seq(
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME
+ )
+
+ private val columnStatsRecordStructType: StructType = AvroConversionUtils.convertAvroSchemaToStructType(HoodieMetadataColumnStats.SCHEMA$)
/**
* @VisibleForTesting
@@ -300,13 +417,28 @@ object ColumnStatsIndexSupport {
@inline private def composeColumnStatStructType(col: String, statName: String, dataType: DataType) =
StructField(formatColName(col, statName), dataType, nullable = true, Metadata.empty)
- private def tryUnpackNonNullVal(statStruct: Row): (Any, Int) =
- statStruct.toSeq.zipWithIndex
- .find(_._1 != null)
- // NOTE: First non-null value will be a wrapper (converted into Row), bearing a single
- // value
- .map { case (value, ord) => (value.asInstanceOf[Row].get(0), ord)}
- .getOrElse((null, -1))
+ private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
+ valueWrapper match {
+ case w: BooleanWrapper => w.getValue
+ case w: IntWrapper => w.getValue
+ case w: LongWrapper => w.getValue
+ case w: FloatWrapper => w.getValue
+ case w: DoubleWrapper => w.getValue
+ case w: BytesWrapper => w.getValue
+ case w: StringWrapper => w.getValue
+ case w: DateWrapper => w.getValue
+ case w: DecimalWrapper => w.getValue
+ case w: TimeMicrosWrapper => w.getValue
+ case w: TimestampMicrosWrapper => w.getValue
+
+ case r: GenericData.Record if expectedAvroSchemaValues.contains(r.getSchema.getName) =>
+ r.get("value")
+
+ case _ => throw new UnsupportedOperationException(s"Not recognized value wrapper type (${valueWrapper.getClass.getSimpleName})")
+ }
+ }
+
+ val decConv = new DecimalConversion()
private def deserialize(value: Any, dataType: DataType): Any = {
dataType match {
@@ -315,12 +447,37 @@ object ColumnStatsIndexSupport {
// here we have to decode those back into corresponding logical representation.
case TimestampType => DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])
case DateType => DateTimeUtils.toJavaDate(value.asInstanceOf[Int])
-
+ // Standard types
+ case StringType => value
+ case BooleanType => value
+ // Numeric types
+ case FloatType => value
+ case DoubleType => value
+ case LongType => value
+ case IntegerType => value
// NOTE: All integral types of size less than Int are encoded as Ints in MT
case ShortType => value.asInstanceOf[Int].toShort
case ByteType => value.asInstanceOf[Int].toByte
- case _ => value
+ // TODO fix
+ case _: DecimalType =>
+ value match {
+ case buffer: ByteBuffer =>
+ val logicalType = DecimalWrapper.SCHEMA$.getField("value").schema().getLogicalType
+ decConv.fromBytes(buffer, null, logicalType)
+ case _ => value
+ }
+ case BinaryType =>
+ value match {
+ case b: ByteBuffer =>
+ val bytes = new Array[Byte](b.remaining)
+ b.get(bytes)
+ bytes
+ case other => other
+ }
+
+ case _ =>
+ throw new UnsupportedOperationException(s"Data type for the statistic value is not recognized $dataType")
}
}
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala
new file mode 100644
index 0000000000..0f41dc1fff
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCatalystUtils.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.data.HoodieData
+import org.apache.spark.sql.Dataset
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel._
+
+object HoodieCatalystUtils extends SparkAdapterSupport {
+
+ /**
+ * Executes provided function while keeping provided [[Dataset]] instance persisted for the
+ * duration of the execution
+ *
+ * @param df target [[Dataset]] to be persisted
+ * @param level desired [[StorageLevel]] of the persistence
+ * @param f target function to be executed while [[Dataset]] is kept persisted
+ * @tparam T return value of the target function
+ * @return execution outcome of the [[f]] function
+ */
+ def withPersistedDataset[T](df: Dataset[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
+ df.persist(level)
+ try {
+ f
+ } finally {
+ df.unpersist()
+ }
+ }
+
+ /**
+ * Executes provided function while keeping provided [[HoodieData]] instance persisted for the
+ * duration of the execution
+ *
+ * @param data target [[Dataset]] to be persisted
+ * @param level desired [[StorageLevel]] of the persistence
+ * @param f target function to be executed while [[Dataset]] is kept persisted
+ * @tparam T return value of the target function
+ * @return execution outcome of the [[f]] function
+ */
+ def withPersistedData[T](data: HoodieData[_], level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
+ data.persist(sparkAdapter.convertStorageLevelToString(level))
+ try {
+ f
+ } finally {
+ data.unpersist()
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index dad7c17650..09f1fac2c8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -27,11 +27,10 @@ import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeRDDUtils, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.asScalaBufferConverter
@@ -92,9 +91,9 @@ object HoodieDatasetBulkInsertHelper extends Logging {
val updatedDF = if (populateMetaFields && config.shouldCombineBeforeInsert) {
val dedupedRdd = dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
- HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, dedupedRdd, updatedSchema)
+ HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
} else {
- HoodieUnsafeRDDUtils.createDataFrame(df.sparkSession, prependedRdd, updatedSchema)
+ HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, prependedRdd, updatedSchema)
}
val trimmedDF = if (shouldDropPartitionColumns) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala
deleted file mode 100644
index a6c689610b..0000000000
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetUtils.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
-
-object HoodieDatasetUtils {
-
- /**
- * Executes provided function while keeping provided [[DataFrame]] instance persisted for the
- * duration of the execution
- *
- * @param df target [[DataFrame]] to be persisted
- * @param level desired [[StorageLevel]] of the persistence
- * @param f target function to be executed while [[DataFrame]] is kept persisted
- * @tparam T return value of the target function
- * @return execution outcome of the [[f]] function
- */
- def withPersistence[T](df: DataFrame, level: StorageLevel = MEMORY_AND_DISK)(f: => T): T = {
- df.persist(level)
- try {
- f
- } finally {
- df.unpersist()
- }
- }
-}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index bd15cb250f..4e158aaa86 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -18,7 +18,6 @@
package org.apache.hudi
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.HoodieDatasetUtils.withPersistence
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, getConfigProperties}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -26,7 +25,7 @@ import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil}
+import org.apache.hudi.metadata.HoodieMetadataPayload
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal}
@@ -35,7 +34,7 @@ import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndex
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
@@ -80,8 +79,9 @@ case class HoodieFileIndex(spark: SparkSession,
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache
)
- with FileIndex
- with ColumnStatsIndexSupport {
+ with FileIndex {
+
+ @transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
override def rootPaths: Seq[Path] = queryPaths.asScala
@@ -95,8 +95,9 @@ case class HoodieFileIndex(spark: SparkSession,
*/
def allFiles: Seq[FileStatus] = {
cachedAllInputFileSlices.values.asScala.flatMap(_.asScala)
- .filter(_.getBaseFile.isPresent)
- .map(_.getBaseFile.get().getFileStatus)
+ .map(fs => fs.getBaseFile.orElse(null))
+ .filter(_ != null)
+ .map(_.getFileStatus)
.toSeq
}
@@ -196,64 +197,63 @@ case class HoodieFileIndex(spark: SparkSession,
// nothing CSI in particular could be applied for)
lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema)
- if (!isMetadataTableEnabled || !isColumnStatsIndexAvailable || !isDataSkippingEnabled) {
+ if (!isMetadataTableEnabled || !isDataSkippingEnabled || !columnStatsIndex.isIndexAvailable) {
validateConfig()
Option.empty
} else if (queryFilters.isEmpty || queryReferencedColumns.isEmpty) {
Option.empty
} else {
- val colStatsDF: DataFrame = readColumnStatsIndex(spark, basePath, metadataConfig, queryReferencedColumns)
-
- // Persist DF to avoid re-computing column statistics unraveling
- withPersistence(colStatsDF) {
- val transposedColStatsDF: DataFrame = transposeColumnStatsIndex(spark, colStatsDF, queryReferencedColumns, schema)
-
- // Persist DF to avoid re-computing column statistics unraveling
- withPersistence(transposedColStatsDF) {
- val indexSchema = transposedColStatsDF.schema
- val indexFilter =
- queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
- .reduce(And)
-
- val allIndexedFileNames =
- transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
-
- val prunedCandidateFileNames =
- transposedColStatsDF.where(new Column(indexFilter))
- .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
-
- // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
- // base-file: since it's bound to clustering, which could occur asynchronously
- // at arbitrary point in time, and is not likely to be touching all of the base files.
- //
- // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
- // files and all outstanding base-files, and make sure that all base files not
- // represented w/in the index are included in the output of this method
- val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
-
- Some(prunedCandidateFileNames ++ notIndexedFileNames)
- }
+ // NOTE: Since executing on-cluster via Spark API has its own non-trivial amount of overhead,
+ // it's most often preferential to fetch Column Stats Index w/in the same process (usually driver),
+ // w/o resorting to on-cluster execution.
+ // For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or
+ // on-cluster: total number of rows of the expected projected portion of the index has to be below the
+ // threshold (of 100k records)
+ val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns)
+
+ columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF =>
+ val indexSchema = transposedColStatsDF.schema
+ val indexFilter =
+ queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema))
+ .reduce(And)
+
+ val allIndexedFileNames =
+ transposedColStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+
+ val prunedCandidateFileNames =
+ transposedColStatsDF.where(new Column(indexFilter))
+ .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+
+ // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every
+ // base-file: since it's bound to clustering, which could occur asynchronously
+ // at arbitrary point in time, and is not likely to be touching all of the base files.
+ //
+ // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index)
+ // files and all outstanding base-files, and make sure that all base files not
+ // represented w/in the index are included in the output of this method
+ val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
+
+ Some(prunedCandidateFileNames ++ notIndexedFileNames)
}
}
}
- override def refresh(): Unit = super.refresh()
+ override def refresh(): Unit = {
+ super.refresh()
+ columnStatsIndex.invalidateCaches()
+ }
override def inputFiles: Array[String] =
allFiles.map(_.getPath.toString).toArray
override def sizeInBytes: Long = cachedFileSize
- private def isColumnStatsIndexAvailable =
- metaClient.getTableConfig.getMetadataPartitions
- .contains(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
-
private def isDataSkippingEnabled: Boolean =
options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(),
spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
similarity index 50%
rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
index 8995701d5f..bd7f2f5456 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeRDDUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala
@@ -21,17 +21,54 @@ package org.apache.spark.sql
import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
/**
* Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]]
*/
-object HoodieUnsafeRDDUtils {
+object HoodieUnsafeUtils {
- // TODO scala-doc
- def createDataFrame(spark: SparkSession, rdd: RDD[InternalRow], structType: StructType): DataFrame =
- spark.internalCreateDataFrame(rdd, structType)
+ /**
+ * Creates [[DataFrame]] from the in-memory [[Seq]] of [[Row]]s with provided [[schema]]
+ *
+ * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it
+ * will be executed by Spark locally
+ *
+ * @param spark spark's session
+ * @param rows collection of rows to base [[DataFrame]] on
+ * @param schema target [[DataFrame]]'s schema
+ * @return
+ */
+ def createDataFrameFromRows(spark: SparkSession, rows: Seq[Row], schema: StructType): DataFrame =
+ Dataset.ofRows(spark, LocalRelation.fromExternalRows(schema.toAttributes, rows))
+
+ /**
+ * Creates [[DataFrame]] from the in-memory [[Seq]] of [[InternalRow]]s with provided [[schema]]
+ *
+ * NOTE: [[DataFrame]] is based on [[LocalRelation]], entailing that most computations with it
+ * will be executed by Spark locally
+ *
+ * @param spark spark's session
+ * @param rows collection of rows to base [[DataFrame]] on
+ * @param schema target [[DataFrame]]'s schema
+ * @return
+ */
+ def createDataFrameFromInternalRows(spark: SparkSession, rows: Seq[InternalRow], schema: StructType): DataFrame =
+ Dataset.ofRows(spark, LocalRelation(schema.toAttributes, rows))
+
+
+ /**
+ * Creates [[DataFrame]] from the [[RDD]] of [[Row]]s with provided [[schema]]
+ *
+ * @param spark spark's session
+ * @param rdd RDD w/ [[Row]]s to base [[DataFrame]] on
+ * @param schema target [[DataFrame]]'s schema
+ * @return
+ */
+ def createDataFrameFromRDD(spark: SparkSession, rdd: RDD[InternalRow], schema: StructType): DataFrame =
+ spark.internalCreateDataFrame(rdd, schema)
/**
* Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index 1d4dbfb1ea..19027a47bf 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -369,8 +369,9 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient)
case class TestCase(enableMetadata: Boolean,
- enableColumnStats: Boolean,
- enableDataSkipping: Boolean)
+ enableColumnStats: Boolean,
+ enableDataSkipping: Boolean,
+ columnStatsProcessingModeOverride: String = null)
val testCases: Seq[TestCase] =
TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = false) ::
@@ -378,6 +379,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) ::
TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) ::
TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) ::
+ TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY) ::
+ TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE) ::
Nil
for (testCase <- testCases) {
@@ -391,7 +394,8 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
val props = Map[String, String](
"path" -> basePath,
QUERY_TYPE.key -> QUERY_TYPE_SNAPSHOT_OPT_VAL,
- DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> testCase.enableDataSkipping.toString,
+ HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_OVERRIDE.key -> testCase.columnStatsProcessingModeOverride
) ++ readMetadataOpts
val fileIndex = HoodieFileIndex(spark, metaClient, Option.empty, props, NoopCache)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
index b982b1851c..822d2051cb 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala
@@ -31,12 +31,12 @@ import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions}
import org.apache.spark.sql._
-import org.apache.spark.sql.functions.typedLit
+import org.apache.spark.sql.functions.{col, lit, typedLit}
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
import org.junit.jupiter.api._
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.{Arguments, ArgumentsSource, MethodSource, ValueSource}
import java.math.BigInteger
import java.sql.{Date, Timestamp}
@@ -44,7 +44,7 @@ import scala.collection.JavaConverters._
import scala.util.Random
@Tag("functional")
-class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSupport {
+class TestColumnStatsIndex extends HoodieClientTestBase {
var spark: SparkSession = _
val sourceTableSchema =
@@ -119,35 +119,31 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
.fromProperties(toProperties(metadataOpts))
.build()
- val requestedColumns: Seq[String] = {
- // Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole
- // MT to be read, and subsequently filtered
- if (testCase.readFullMetadataTable) Seq.empty
- else sourceTableSchema.fieldNames
- }
+ val requestedColumns: Seq[String] = sourceTableSchema.fieldNames
- val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
- val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema)
- // Match against expected column stats table
- val expectedColStatsIndexTableDf =
- spark.read
- .schema(expectedColStatsSchema)
- .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
+ columnStatsIndex.loadTransposed(requestedColumns, testCase.shouldReadInMemory) { transposedColStatsDF =>
+ // Match against expected column stats table
+ val expectedColStatsIndexTableDf =
+ spark.read
+ .schema(expectedColStatsSchema)
+ .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
- // NOTE: We have to drop the `fileName` column as it contains semi-random components
- // that we can't control in this test. Nevertheless, since we manually verify composition of the
- // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
- assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName"))))
+ assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema)
+ // NOTE: We have to drop the `fileName` column as it contains semi-random components
+ // that we can't control in this test. Nevertheless, since we manually verify composition of the
+ // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
+ assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(transposedColStatsDF.drop("fileName"))))
- // Collect Column Stats manually (reading individual Parquet files)
- val manualColStatsTableDF =
- buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+ // Collect Column Stats manually (reading individual Parquet files)
+ val manualColStatsTableDF =
+ buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema)
- assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF)))
+ assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF)))
+ }
// do an upsert and validate
val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString
@@ -166,26 +162,28 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
metaClient = HoodieTableMetaClient.reload(metaClient)
- val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
- val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema)
+ val updatedColumnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
- val expectedColStatsIndexUpdatedDF =
- spark.read
- .schema(expectedColStatsSchema)
- .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
+ updatedColumnStatsIndex.loadTransposed(requestedColumns, testCase.shouldReadInMemory) { transposedUpdatedColStatsDF =>
+ val expectedColStatsIndexUpdatedDF =
+ spark.read
+ .schema(expectedColStatsSchema)
+ .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
- assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+ assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
+ assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
- // Collect Column Stats manually (reading individual Parquet files)
- val manualUpdatedColStatsTableDF =
- buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, expectedColStatsSchema)
+ // Collect Column Stats manually (reading individual Parquet files)
+ val manualUpdatedColStatsTableDF =
+ buildColumnStatsTableManually(basePath, sourceTableSchema.fieldNames, sourceTableSchema.fieldNames, expectedColStatsSchema)
- assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+ assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+ }
}
- @Test
- def testMetadataColumnStatsIndexPartialProjection(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testMetadataColumnStatsIndexPartialProjection(shouldReadInMemory: Boolean): Unit = {
val targetColumnsToIndex = Seq("c1", "c2", "c3")
val metadataOpts = Map(
@@ -235,11 +233,11 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
// These are NOT indexed
val requestedColumns = Seq("c4")
- val emptyColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
- val emptyTransposedColStatsDF = transposeColumnStatsIndex(spark, emptyColStatsDF, requestedColumns, sourceTableSchema)
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
- assertEquals(0, emptyColStatsDF.collect().length)
- assertEquals(0, emptyTransposedColStatsDF.collect().length)
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { emptyTransposedColStatsDF =>
+ assertEquals(0, emptyTransposedColStatsDF.collect().length)
+ }
}
////////////////////////////////////////////////////////////////////////
@@ -252,29 +250,27 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
// We have to include "c1", since we sort the expected outputs by this column
val requestedColumns = Seq("c4", "c1")
- val partialColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
- val partialTransposedColStatsDF = transposeColumnStatsIndex(spark, partialColStatsDF, requestedColumns, sourceTableSchema)
-
- val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns)
- val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema)
-
+ val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
// Match against expected column stats table
val expectedColStatsIndexTableDf =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/colstats/partial-column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema)
- // NOTE: We have to drop the `fileName` column as it contains semi-random components
- // that we can't control in this test. Nevertheless, since we manually verify composition of the
- // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
- assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
-
// Collect Column Stats manually (reading individual Parquet files)
val manualColStatsTableDF =
- buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema)
+ buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema)
+
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
- assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF)))
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF =>
+ assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema)
+ // NOTE: We have to drop the `fileName` column as it contains semi-random components
+ // that we can't control in this test. Nevertheless, since we manually verify composition of the
+ // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue
+ assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName"))))
+ assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF)))
+ }
}
////////////////////////////////////////////////////////////////////////
@@ -307,27 +303,26 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
val requestedColumns = sourceTableSchema.fieldNames
- // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema),
- // we should be able to read CSI, which will be properly padded (with nulls) after transposition
- val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns)
- val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, requestedColumns, sourceTableSchema)
-
- val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns)
- val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema)
-
+ val expectedColStatsSchema = composeIndexSchema(requestedColumns.sorted, sourceTableSchema)
val expectedColStatsIndexUpdatedDF =
spark.read
.schema(expectedColStatsSchema)
.json(getClass.getClassLoader.getResource("index/colstats/updated-partial-column-stats-index-table.json").toString)
- assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
- assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
-
// Collect Column Stats manually (reading individual Parquet files)
val manualUpdatedColStatsTableDF =
- buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema)
+ buildColumnStatsTableManually(basePath, requestedColumns, targetColumnsToIndex, expectedColStatsSchema)
- assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+ val columnStatsIndex = new ColumnStatsIndexSupport(spark, sourceTableSchema, metadataConfig, metaClient)
+
+ // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema),
+ // we should be able to read CSI, which will be properly padded (with nulls) after transposition
+ columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { transposedUpdatedColStatsDF =>
+ assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema)
+
+ assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName"))))
+ assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF)))
+ }
}
}
@@ -370,7 +365,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
})
}
- private def buildColumnStatsTableManually(tablePath: String, indexedCols: Seq[String], indexSchema: StructType) = {
+ private def buildColumnStatsTableManually(tablePath: String,
+ includedCols: Seq[String],
+ indexedCols: Seq[String],
+ indexSchema: StructType): DataFrame = {
val files = {
val it = fs.listFiles(new Path(tablePath), true)
var seq = Seq[LocatedFileStatus]()
@@ -387,15 +385,23 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
s"'${typedLit(file.getPath.getName)}' AS file" +:
s"sum(1) AS valueCount" +:
df.columns
- .filter(col => indexedCols.contains(col))
+ .filter(col => includedCols.contains(col))
.flatMap(col => {
val minColName = s"${col}_minValue"
val maxColName = s"${col}_maxValue"
- Seq(
- s"min($col) AS $minColName",
- s"max($col) AS $maxColName",
- s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
- )
+ if (indexedCols.contains(col)) {
+ Seq(
+ s"min($col) AS $minColName",
+ s"max($col) AS $maxColName",
+ s"sum(cast(isnull($col) AS long)) AS ${col}_nullCount"
+ )
+ } else {
+ Seq(
+ s"null AS $minColName",
+ s"null AS $maxColName",
+ s"null AS ${col}_nullCount"
+ )
+ }
})
df.selectExpr(exprs: _*)
@@ -461,11 +467,13 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup
object TestColumnStatsIndex {
- case class ColumnStatsTestCase(forceFullLogScan: Boolean, readFullMetadataTable: Boolean)
+ case class ColumnStatsTestCase(forceFullLogScan: Boolean, shouldReadInMemory: Boolean)
def testMetadataColumnStatsIndexParams: java.util.stream.Stream[Arguments] =
java.util.stream.Stream.of(
- Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, readFullMetadataTable = false)),
- Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, readFullMetadataTable = true))
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, shouldReadInMemory = true)),
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = false, shouldReadInMemory = false)),
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, shouldReadInMemory = false)),
+ Arguments.arguments(ColumnStatsTestCase(forceFullLogScan = true, shouldReadInMemory = true))
)
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index b24a341d4a..00ab709144 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -30,7 +30,7 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSo
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode}
+import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Disabled, Tag, Test}
@@ -316,7 +316,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
val (rows, bytesRead) = measureBytesRead { () =>
val rdd = relation.buildScan(targetColumns, Array.empty).asInstanceOf[HoodieUnsafeRDD]
- HoodieUnsafeRDDUtils.collect(rdd)
+ HoodieUnsafeUtils.collect(rdd)
}
val targetRecordCount = tableState.targetRecordCount;
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 30af252d2d..3c0282d710 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -22,9 +22,13 @@ import org.apache.avro.Schema
import org.apache.hudi.Spark2RowSerDe
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.spark.sql.avro._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, Spark2ParsePartitionUtil, SparkParsePartitionUtil}
import org.apache.spark.sql.hudi.SparkAdapter
@@ -32,6 +36,8 @@ import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.{HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, HoodieSpark2CatalystExpressionUtils, HoodieSpark2CatalystPlanUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel._
import scala.collection.mutable.ArrayBuffer
@@ -115,4 +121,20 @@ class Spark2Adapter extends SparkAdapter {
override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
InterpretedPredicate.create(e)
}
+
+ override def convertStorageLevelToString(level: StorageLevel): String = level match {
+ case NONE => "NONE"
+ case DISK_ONLY => "DISK_ONLY"
+ case DISK_ONLY_2 => "DISK_ONLY_2"
+ case MEMORY_ONLY => "MEMORY_ONLY"
+ case MEMORY_ONLY_2 => "MEMORY_ONLY_2"
+ case MEMORY_ONLY_SER => "MEMORY_ONLY_SER"
+ case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2"
+ case MEMORY_AND_DISK => "MEMORY_AND_DISK"
+ case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2"
+ case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER"
+ case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2"
+ case OFF_HEAP => "OFF_HEAP"
+ case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level")
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index 034d21dba4..4f55039746 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.{HoodieCatalystPlansUtils, HoodieSpark3CatalystPlanUtils, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.storage.StorageLevel.{DISK_ONLY, DISK_ONLY_2, DISK_ONLY_3, MEMORY_AND_DISK, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER, MEMORY_AND_DISK_SER_2, MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_ONLY_SER, MEMORY_ONLY_SER_2, NONE, OFF_HEAP}
import scala.util.control.NonFatal
@@ -100,4 +102,24 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
Predicate.createInterpreted(e)
}
+
+ /**
+ * Converts instance of [[StorageLevel]] to a corresponding string
+ */
+ override def convertStorageLevelToString(level: StorageLevel): String = level match {
+ case NONE => "NONE"
+ case DISK_ONLY => "DISK_ONLY"
+ case DISK_ONLY_2 => "DISK_ONLY_2"
+ case DISK_ONLY_3 => "DISK_ONLY_3"
+ case MEMORY_ONLY => "MEMORY_ONLY"
+ case MEMORY_ONLY_2 => "MEMORY_ONLY_2"
+ case MEMORY_ONLY_SER => "MEMORY_ONLY_SER"
+ case MEMORY_ONLY_SER_2 => "MEMORY_ONLY_SER_2"
+ case MEMORY_AND_DISK => "MEMORY_AND_DISK"
+ case MEMORY_AND_DISK_2 => "MEMORY_AND_DISK_2"
+ case MEMORY_AND_DISK_SER => "MEMORY_AND_DISK_SER"
+ case MEMORY_AND_DISK_SER_2 => "MEMORY_AND_DISK_SER_2"
+ case OFF_HEAP => "OFF_HEAP"
+ case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level")
+ }
}