You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/01/20 16:29:37 UTC
[hudi] branch master updated: [HUDI-5384] Adding optimization rule to appropriately push down filters into the `HoodieFileIndex` (#7423)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b1552eff7af [HUDI-5384] Adding optimization rule to appropriately push down filters into the `HoodieFileIndex` (#7423)
b1552eff7af is described below
commit b1552eff7af5fee8f8b11d051a147240ef619689
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Jan 20 08:29:30 2023 -0800
[HUDI-5384] Adding optimization rule to appropriately push down filters into the `HoodieFileIndex` (#7423)
### Change Logs
This is a follow-up for https://github.com/apache/hudi/pull/6680
After transitioning of the `HoodieFileIndex` to do file-listing _lazily_ by default, following issue has been uncovered: due to
- Listing now being delayed (until actual execution of the `FileSourceScanExec` node)
- Spark not providing a generic `Rule` to push-down the predicates (it has `PruneFileSourcePartitions` but it's only applicable to `CatalogFileIndex`)
Statistics (based on the `FileIndex`) for Hudi's relations have been incorrectly estimated due to now these being delayed until the execution time when partition-predicates are pushed-down to `HoodieFileIndex`.
To work this around we're introducing a new `HoodiePruneFileSourcePartitions` rule that is
- Structurally borrowing from `PruneFileSourcePartitions`
- Pushes down predicates to `HoodieFileIndex` to perform partition-pruning in time, before subsequent CBO stage
- Addresses the issue of statistics for Hudi's relations being incorrectly estimated
For more details around the impact of `HoodiePruneFileSourcePartitions`, please check out corresponding `TestHoodiePruneFileSourcePartitions`
---
.../client/common/HoodieSparkEngineContext.java | 10 +-
.../spark/sql/HoodieCatalystExpressionUtils.scala | 38 ++--
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 23 ++-
.../hudi/testutils/HoodieClientTestHarness.java | 7 +-
.../apache/hudi/common/util/TablePathUtils.java | 4 +
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 4 +-
.../scala/org/apache/hudi/HoodieFileIndex.scala | 25 ++-
.../apache/hudi/SparkHoodieTableFileIndex.scala | 2 +
.../sql/hudi/HoodieSparkSessionExtension.scala | 15 +-
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 55 +++--
.../analysis/HoodiePruneFileSourcePartitions.scala | 124 +++++++++++
.../org/apache/hudi/TestHoodieFileIndex.scala | 12 +-
.../hudi/TestNestedSchemaPruningOptimization.scala | 3 +
.../TestHoodiePruneFileSourcePartitions.scala | 228 +++++++++++++++++++++
.../sql/HoodieSpark2CatalystExpressionUtils.scala | 54 ++++-
.../apache/spark/sql/adapter/Spark2Adapter.scala | 25 ++-
.../sql/HoodieSpark3CatalystExpressionUtils.scala | 32 +++
.../spark/sql/adapter/BaseSpark3Adapter.scala | 40 ++--
.../sql/HoodieSpark31CatalystExpressionUtils.scala | 4 +-
.../sql/HoodieSpark32CatalystExpressionUtils.scala | 4 +-
.../sql/HoodieSpark33CatalystExpressionUtils.scala | 2 +-
21 files changed, 623 insertions(+), 88 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index d8281d1a10b..c97cb78d8c9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -57,15 +57,15 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
private static final Logger LOG = LogManager.getLogger(HoodieSparkEngineContext.class);
private final JavaSparkContext javaSparkContext;
- private SQLContext sqlContext;
+ private final SQLContext sqlContext;
public HoodieSparkEngineContext(JavaSparkContext jsc) {
- super(new SerializableConfiguration(jsc.hadoopConfiguration()), new SparkTaskContextSupplier());
- this.javaSparkContext = jsc;
- this.sqlContext = SQLContext.getOrCreate(jsc.sc());
+ this(jsc, SQLContext.getOrCreate(jsc.sc()));
}
- public void setSqlContext(SQLContext sqlContext) {
+ public HoodieSparkEngineContext(JavaSparkContext jsc, SQLContext sqlContext) {
+ super(new SerializableConfiguration(jsc.hadoopConfiguration()), new SparkTaskContextSupplier());
+ this.javaSparkContext = jsc;
this.sqlContext = sqlContext;
}
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index 24e50613f50..8a609d7d532 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -19,15 +19,28 @@ package org.apache.spark.sql
import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Like, Literal, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
-import scala.annotation.tailrec
trait HoodieCatalystExpressionUtils {
+ /**
+ * Returns a filter that its reference is a subset of `outputSet` and it contains the maximum
+ * constraints from `condition`. This is used for predicate push-down
+ * When there is no such filter, `None` is returned.
+ */
+ def extractPredicatesWithinOutputSet(condition: Expression,
+ outputSet: AttributeSet): Option[Expression]
+
+ /**
+ * The attribute name may differ from the one in the schema if the query analyzer
+ * is case insensitive. We should change attribute names to match the ones in the schema,
+ * so we do not need to worry about case sensitivity anymore
+ */
+ def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression]
+
/**
* Matches an expression iff
*
@@ -83,7 +96,7 @@ object HoodieCatalystExpressionUtils {
val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
val targetExprs = to.fields.map(f => attrsMap(f.name))
- GenerateUnsafeProjection.generate(targetExprs, attrs)
+ UnsafeProjection.create(targetExprs, attrs)
}
/**
@@ -109,23 +122,6 @@ object HoodieCatalystExpressionUtils {
})
}
- /**
- * Generates instance of [[MutableProjection]] projecting row of one [[StructType]] into another [[StructType]]
- *
- * NOTE: No safety checks are executed to validate that this projection is actually feasible,
- * it's up to the caller to make sure that such projection is possible.
- *
- * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
- * B is a subset of A
- */
- def generateMutableProjection(from: StructType, to: StructType): MutableProjection = {
- val attrs = from.toAttributes
- val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
- val targetExprs = to.fields.map(f => attrsMap(f.name))
-
- GenerateMutableProjection.generate(targetExprs, attrs)
- }
-
/**
* Parses and resolves expression against the attributes of the given table schema.
*
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index ca50d82eae9..f4edf7fb942 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -22,21 +22,18 @@ import org.apache.avro.Schema
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.TablePathUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql.{HoodieCatalogUtils, HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SQLContext, SparkSession, SparkSessionExtensions}
+import org.apache.spark.sql._
import org.apache.spark.storage.StorageLevel
import java.util.Locale
@@ -120,9 +117,15 @@ trait SparkAdapter extends Serializable {
def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
unfoldSubqueryAliases(table) match {
case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
- case relation: UnresolvedRelation =>
- isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
- case _=> false
+ // This is to handle the cases when table is loaded by providing
+ // the path to the Spark DS and not from the catalog
+ case LogicalRelation(fsr: HadoopFsRelation, _, _, _) =>
+ fsr.options.get("path").map { pathStr =>
+ val path = new Path(pathStr)
+ TablePathUtils.isHoodieTablePath(path.getFileSystem(spark.sparkContext.hadoopConfiguration), path)
+ } getOrElse(false)
+
+ case _ => false
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 7cebf894a2a..60dd9d653d8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -200,7 +200,6 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness {
jsc.setLogLevel("ERROR");
hadoopConf = jsc.hadoopConfiguration();
- context = new HoodieSparkEngineContext(jsc);
sparkSession = SparkSession.builder()
.withExtensions(JFunction.toScala(sparkSessionExtensions -> {
@@ -209,7 +208,13 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness {
}))
.config(jsc.getConf())
.getOrCreate();
+
sqlContext = new SQLContext(sparkSession);
+ context = new HoodieSparkEngineContext(jsc, sqlContext);
+
+ // NOTE: It's important to set Spark's `Tests.IS_TESTING` so that our tests are recognized
+ // as such by Spark
+ System.setProperty("spark.testing", "true");
}
/**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
index 937019db8d0..43f2f83360a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
@@ -48,6 +48,10 @@ public class TablePathUtils {
}
}
+ public static boolean isHoodieTablePath(FileSystem fs, Path path) {
+ return hasTableMetadataFolder(fs, path);
+ }
+
public static Option<Path> getTablePath(FileSystem fs, Path path) throws HoodieException, IOException {
LOG.info("Getting table path from path : " + path);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 80914337eec..52df84b17f8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -237,7 +237,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
* this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until
* it's actually accessed
*/
- protected lazy val fileIndex: HoodieFileIndex =
+ lazy val fileIndex: HoodieFileIndex =
HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams,
FileStatusCache.getOrCreate(sparkSession))
@@ -289,6 +289,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// TODO(HUDI-XXX) internal schema doesn't support nested schema pruning currently
!hasSchemaOnRead
+ override def sizeInBytes: Long = fileIndex.sizeInBytes
+
override def schema: StructType = {
// NOTE: Optimizer could prune the schema (applying for ex, [[NestedSchemaPruning]] rule) setting new updated
// schema in-place (via [[setPrunedDataSchema]] method), therefore we have to make sure that we pick
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 60f413970a0..003e27233a0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -39,7 +39,9 @@ import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
+import javax.annotation.concurrent.NotThreadSafe
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
@@ -66,6 +68,7 @@ import scala.util.{Failure, Success, Try}
*
* TODO rename to HoodieSparkSqlFileIndex
*/
+@NotThreadSafe
case class HoodieFileIndex(spark: SparkSession,
metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType],
@@ -82,6 +85,12 @@ case class HoodieFileIndex(spark: SparkSession,
)
with FileIndex {
+ @transient private var hasPushedDownPartitionPredicates: Boolean = false
+
+ /**
+ * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only relevant while logical plan
+ * is handled by the Spark's driver
+ */
@transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
override def rootPaths: Seq[Path] = getQueryPaths.asScala
@@ -162,6 +171,8 @@ case class HoodieFileIndex(spark: SparkSession,
s"candidate files after data skipping: $candidateFileSize; " +
s"skipping percentage $skippingRatio")
+ hasPushedDownPartitionPredicates = true
+
if (shouldReadAsPartitionedTable()) {
listedPartitions
} else {
@@ -245,6 +256,7 @@ case class HoodieFileIndex(spark: SparkSession,
override def refresh(): Unit = {
super.refresh()
columnStatsIndex.invalidateCaches()
+ hasPushedDownPartitionPredicates = false
}
override def inputFiles: Array[String] =
@@ -252,8 +264,11 @@ case class HoodieFileIndex(spark: SparkSession,
override def sizeInBytes: Long = getTotalCachedFilesSize
+ def hasPredicatesPushedDown: Boolean =
+ hasPushedDownPartitionPredicates
+
private def isDataSkippingEnabled: Boolean = getConfigValue(options, spark.sessionState.conf,
- DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false").toBoolean
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, DataSourceReadOptions.ENABLE_DATA_SKIPPING.defaultValue.toString).toBoolean
private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()
@@ -297,6 +312,8 @@ object HoodieFileIndex extends Logging {
val properties = new TypedProperties()
properties.putAll(options.filter(p => p._2 != null).asJava)
+ // TODO(HUDI-5361) clean up properties carry-over
+
// To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
// would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
val isMetadataTableEnabled = getConfigValue(options, sqlConf, HoodieMetadataConfig.ENABLE.key, null)
@@ -304,6 +321,12 @@ object HoodieFileIndex extends Logging {
properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataTableEnabled))
}
+ val listingModeOverride = getConfigValue(options, sqlConf,
+ DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, null)
+ if (listingModeOverride != null) {
+ properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride)
+ }
+
properties
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 66dfc2c161c..1e7ce03d0b1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
+import javax.annotation.concurrent.NotThreadSafe
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@@ -56,6 +57,7 @@ import scala.language.implicitConversions
* @param specifiedQueryInstant instant as of which table is being queried
* @param fileStatusCache transient cache of fetched [[FileStatus]]es
*/
+@NotThreadSafe
class SparkHoodieTableFileIndex(spark: SparkSession,
metaClient: HoodieTableMetaClient,
schemaSpec: Option[StructType],
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
index 4e74cbe084d..0e80aca505c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
@@ -32,10 +32,6 @@ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
new HoodieCommonSqlParser(session, parser)
}
- HoodieAnalysis.customOptimizerRules.foreach { ruleBuilder =>
- extensions.injectOptimizerRule(ruleBuilder(_))
- }
-
HoodieAnalysis.customResolutionRules.foreach { ruleBuilder =>
extensions.injectResolutionRule(ruleBuilder(_))
}
@@ -44,6 +40,17 @@ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
extensions.injectPostHocResolutionRule(ruleBuilder(_))
}
+ HoodieAnalysis.customOptimizerRules.foreach { ruleBuilder =>
+ extensions.injectOptimizerRule(ruleBuilder(_))
+ }
+
+ /*
+ // CBO is only supported in Spark >= 3.1.x
+ HoodieAnalysis.customPreCBORules.foreach { ruleBuilder =>
+ extensions.injectPreCBORule(ruleBuilder(_))
+ }
+ */
+
sparkAdapter.injectTableFunctions(extensions)
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index e19f0b39f93..858d914d7b5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -44,25 +44,6 @@ import scala.collection.mutable.ListBuffer
object HoodieAnalysis {
type RuleBuilder = SparkSession => Rule[LogicalPlan]
- def customOptimizerRules: Seq[RuleBuilder] = {
- if (HoodieSparkUtils.gteqSpark3_1) {
- val nestedSchemaPruningClass =
- if (HoodieSparkUtils.gteqSpark3_3) {
- "org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
- } else if (HoodieSparkUtils.gteqSpark3_2) {
- "org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
- } else {
- // spark 3.1
- "org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
- }
-
- val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
- Seq(_ => nestedSchemaPruningRule)
- } else {
- Seq.empty
- }
- }
-
def customResolutionRules: Seq[RuleBuilder] = {
val rules: ListBuffer[RuleBuilder] = ListBuffer(
// Default rules
@@ -120,6 +101,42 @@ object HoodieAnalysis {
rules
}
+ def customOptimizerRules: Seq[RuleBuilder] = {
+ val optimizerRules = ListBuffer[RuleBuilder]()
+ if (HoodieSparkUtils.gteqSpark3_1) {
+ val nestedSchemaPruningClass =
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ "org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
+ } else if (HoodieSparkUtils.gteqSpark3_2) {
+ "org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
+ } else {
+ // spark 3.1
+ "org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
+ }
+
+ val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
+ // TODO(HUDI-5443) re-enable
+ //optimizerRules += (_ => nestedSchemaPruningRule)
+ }
+
+ // NOTE: [[HoodiePruneFileSourcePartitions]] is a replica in kind to Spark's
+ // [[PruneFileSourcePartitions]] and as such should be executed at the same stage.
+ // However, currently Spark doesn't allow [[SparkSessionExtensions]] to inject into
+ // [[BaseSessionStateBuilder.customEarlyScanPushDownRules]] even though it could directly
+ // inject into the Spark's [[Optimizer]]
+ //
+ // To work this around, we injecting this as the rule that trails pre-CBO, ie it's
+ // - Triggered before CBO, therefore have access to the same stats as CBO
+ // - Precedes actual [[customEarlyScanPushDownRules]] invocation
+ optimizerRules += (spark => HoodiePruneFileSourcePartitions(spark))
+
+ optimizerRules
+ }
+
+ /*
+ // CBO is only supported in Spark >= 3.1.x
+ def customPreCBORules: Seq[RuleBuilder] = Seq()
+ */
}
/**
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
new file mode 100644
index 00000000000..3b86777e16e
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.{HoodieRelationMatcher, exprUtils, getPartitionFiltersAndDataFilters, rebuildPhysicalOperation}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Prune the partitions of Hudi table based relations by the means of pushing down the
+ * partition filters
+ *
+ * NOTE: [[HoodiePruneFileSourcePartitions]] is a replica in kind to Spark's [[PruneFileSourcePartitions]]
+ */
+case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+ case op @ PhysicalOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
+ if sparkAdapter.isHoodieTable(lr, spark) && fileIndex.partitionSchema.nonEmpty && !fileIndex.hasPredicatesPushedDown =>
+
+ val deterministicFilters = filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f))
+ val normalizedFilters = exprUtils.normalizeExprs(deterministicFilters, lr.output)
+
+ val (partitionPruningFilters, _) =
+ getPartitionFiltersAndDataFilters(fileIndex.partitionSchema, normalizedFilters)
+
+ // [[HudiFileIndex]] is a caching one, therefore we don't need to reconstruct new relation,
+ // instead we simply just refresh the index and update the stats
+ fileIndex.listFiles(partitionPruningFilters, Seq())
+
+ if (partitionPruningFilters.nonEmpty) {
+ // Change table stats based on the sizeInBytes of pruned files
+ val filteredStats = FilterEstimation(Filter(partitionPruningFilters.reduce(And), lr)).estimate
+ val colStats = filteredStats.map {
+ _.attributeStats.map { case (attr, colStat) =>
+ (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType))
+ }
+ }
+
+ val tableWithStats = lr.catalogTable.map(_.copy(
+ stats = Some(
+ CatalogStatistics(
+ sizeInBytes = BigInt(fileIndex.sizeInBytes),
+ rowCount = filteredStats.flatMap(_.rowCount),
+ colStats = colStats.getOrElse(Map.empty)))
+ ))
+
+ val prunedLogicalRelation = lr.copy(catalogTable = tableWithStats)
+ // Keep partition-pruning predicates so that they are visible in physical planning
+ rebuildPhysicalOperation(projects, filters, prunedLogicalRelation)
+ } else {
+ op
+ }
+ }
+
+}
+
+private object HoodiePruneFileSourcePartitions extends PredicateHelper {
+
+ private val exprUtils = sparkAdapter.getCatalystExpressionUtils
+
+ private object HoodieRelationMatcher {
+ def unapply(relation: BaseRelation): Option[HoodieFileIndex] = relation match {
+ case HadoopFsRelation(fileIndex: HoodieFileIndex, _, _, _, _, _) => Some(fileIndex)
+ case r: HoodieBaseRelation => Some(r.fileIndex)
+ case _ => None
+ }
+ }
+
+ private def rebuildPhysicalOperation(projects: Seq[NamedExpression],
+ filters: Seq[Expression],
+ relation: LeafNode): Project = {
+ val withFilter = if (filters.nonEmpty) {
+ val filterExpression = filters.reduceLeft(And)
+ Filter(filterExpression, relation)
+ } else {
+ relation
+ }
+ Project(projects, withFilter)
+ }
+
+ def getPartitionFiltersAndDataFilters(partitionSchema: StructType,
+ normalizedFilters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
+ val partitionColumns = normalizedFilters.flatMap { expr =>
+ expr.collect {
+ case attr: AttributeReference if partitionSchema.names.contains(attr.name) =>
+ attr
+ }
+ }
+ val partitionSet = AttributeSet(partitionColumns)
+ val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
+ f.references.subsetOf(partitionSet)
+ )
+ val extraPartitionFilter =
+ dataFilters.flatMap(exprUtils.extractPredicatesWithinOutputSet(_, partitionSet))
+ (ExpressionSet(partitionFilters ++ extraPartitionFilter).toSeq, dataFilters)
+ }
+
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index a0a060d4e28..0f6ae1b16e6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.DataSourceReadOptions.{FILE_INDEX_LISTING_MODE_EAGER, FILE_INDEX_LISTING_MODE_LAZY, QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.common.HoodieJavaEngineContext
@@ -39,17 +40,20 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.util.JFunction
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory}
import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{IntegerType, StringType}
-import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession, SparkSessionExtensions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource}
import java.util.Properties
+import java.util.function.Consumer
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.util.Random
@@ -71,6 +75,12 @@ class TestHoodieFileIndex extends HoodieClientTestBase with ScalaAssertionSuppor
DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
)
+ override def getSparkSessionExtensionsInjector: org.apache.hudi.common.util.Option[Consumer[SparkSessionExtensions]] =
+ toJavaOption(
+ Some(
+ JFunction.toJavaConsumer((receiver: SparkSessionExtensions) =>
+ new HoodieSparkSessionExtension().apply(receiver))))
+
@BeforeEach
override def setUp() {
setTableName("hoodie_test")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
index 87d19d31d96..8c631f9bc2b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
@@ -37,6 +37,8 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp
private def executePlan(plan: LogicalPlan): SparkPlan =
spark.sessionState.executePlan(plan).executedPlan
+ // TODO(HUDI-5443) re-enable
+ /*
test("Test NestedSchemaPruning optimization (COW/MOR)") {
withTempDir { tmp =>
// NOTE: This tests are only relevant for Spark >= 3.1
@@ -117,5 +119,6 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp
}
}
}
+ */
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
new file mode 100644
index 00000000000..36540b43a40
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.ScalaAssertionSupport
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.util.JFunction
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, IsNotNull, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Assertions, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import java.util.function.Consumer
+
+class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with ScalaAssertionSupport {
+
+ private var spark: SparkSession = _
+
+ @BeforeEach
+ override def setUp() {
+ setTableName("hoodie_test")
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ }
+
+ override def getSparkSessionExtensionsInjector: org.apache.hudi.common.util.Option[Consumer[SparkSessionExtensions]] =
+ toJavaOption(
+ Some(
+ JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver)))
+ )
+
+ @ParameterizedTest
+ @CsvSource(value = Array("cow", "mor"))
+ def testPartitionFiltersPushDown(tableType: String): Unit = {
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | partition string
+ |) USING hudi
+ |PARTITIONED BY (partition)
+ |TBLPROPERTIES (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ |LOCATION '$basePath/$tableName'
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ |INSERT INTO $tableName VALUES
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ Seq("eager", "lazy").foreach { listingModeOverride =>
+ // We need to refresh the table to make sure Spark is re-processing the query every time
+ // instead of serving already cached value
+ spark.sessionState.catalog.invalidateAllCachedTables()
+
+ spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
+
+ val df = spark.sql(s"SELECT * FROM $tableName WHERE partition = '2021-01-05'")
+ val optimizedPlan = df.queryExecution.optimizedPlan
+
+ optimizedPlan match {
+ case f @ Filter(And(IsNotNull(_), EqualTo(attr: AttributeReference, Literal(value, StringType))), lr: LogicalRelation)
+ if attr.name == "partition" && value.toString.equals("2021-01-05") =>
+
+ listingModeOverride match {
+ // Case #1: Eager listing (fallback mode).
+ // Whole table will be _listed_ before partition-pruning would be applied. This is default
+ // Spark behavior that naturally occurs, since predicate push-down for tables w/o appropriate catalog
+ // support (for partition-pruning) will only occur during execution phase, while file-listing
+ // actually happens during analysis stage
+ case "eager" =>
+ assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
+ assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+
+ // Case #2: Lazy listing (default mode).
+ // In case of lazy listing mode, Hudi will only list partitions matching partition-predicates that are
+ // eagerly pushed down (w/ help of [[HoodiePruneFileSourcePartitions]]) avoiding the necessity to
+ // list the whole table
+ case "lazy" =>
+ assertEquals(425, f.stats.sizeInBytes.longValue() / 1024)
+ assertEquals(425, lr.stats.sizeInBytes.longValue() / 1024)
+
+ case _ => throw new UnsupportedOperationException()
+ }
+
+ val executionPlan = df.queryExecution.executedPlan
+ val expectedPhysicalPlanPartitionFiltersClause = tableType match {
+ case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
+ case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
+ }
+
+ Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+
+ case _ =>
+ val failureHint =
+ s"""Expected to see plan like below:
+ |```
+ |== Optimized Logical Plan ==
+ |Filter (isnotnull(partition#74) AND (partition#74 = 2021-01-05)), Statistics(sizeInBytes=...)
+ |+- Relation default.hoodie_test[_hoodie_commit_time#65,_hoodie_commit_seqno#66,_hoodie_record_key#67,_hoodie_partition_path#68,_hoodie_file_name#69,id#70,name#71,price#72,ts#73L,partition#74] parquet, Statistics(sizeInBytes=...)
+ |```
+ |
+ |Instead got
+ |```
+ |$optimizedPlan
+ |```
+ |""".stripMargin.trim
+
+ fail(failureHint)
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = Array("cow", "mor"))
+ def testEmptyPartitionFiltersPushDown(tableType: String): Unit = {
+ spark.sql(
+ s"""
+ |CREATE TABLE $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | partition string
+ |) USING hudi
+ |PARTITIONED BY (partition)
+ |TBLPROPERTIES (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ |)
+ |LOCATION '$basePath/$tableName'
+ """.stripMargin)
+
+ spark.sql(
+ s"""
+ |INSERT INTO $tableName VALUES
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ Seq("eager", "lazy").foreach { listingModeOverride =>
+ // We need to refresh the table to make sure Spark is re-processing the query every time
+ // instead of serving already cached value
+ spark.sessionState.catalog.invalidateAllCachedTables()
+
+ spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
+
+ val df = spark.sql(s"SELECT * FROM $tableName")
+ val optimizedPlan = df.queryExecution.optimizedPlan
+
+ optimizedPlan match {
+ case lr: LogicalRelation =>
+
+ // When there are no partition pruning predicates pushed down in both cases of lazy/eager listing the whole
+ // table have to be listed
+ listingModeOverride match {
+ case "eager" | "lazy" =>
+ assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+
+ case _ => throw new UnsupportedOperationException()
+ }
+
+ val executionPlan = df.queryExecution.executedPlan
+ val expectedPhysicalPlanPartitionFiltersClause = tableType match {
+ case "cow" => s"PartitionFilters: []"
+ case "mor" => s"PushedFilters: []"
+ }
+
+ Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+
+ case _ =>
+ val failureHint =
+ s"""Expected to see plan like below:
+ |```
+ |== Optimized Logical Plan ==
+ |Filter (isnotnull(partition#74) AND (partition#74 = 2021-01-05)), Statistics(sizeInBytes=...)
+ |+- Relation default.hoodie_test[_hoodie_commit_time#65,_hoodie_commit_seqno#66,_hoodie_record_key#67,_hoodie_partition_path#68,_hoodie_file_name#69,id#70,name#71,price#72,ts#73L,partition#74] parquet, Statistics(sizeInBytes=...)
+ |```
+ |
+ |Instead got
+ |```
+ |$optimizedPlan
+ |```
+ |""".stripMargin.trim
+
+ fail(failureHint)
+ }
+ }
+ }
+
+
+}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
index 854adcff6ab..92ccf02cab6 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
@@ -18,11 +18,63 @@
package org.apache.spark.sql
import HoodieSparkTypeUtils.isCastPreservingOrdering
-import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Like, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
+import org.apache.spark.sql.catalyst.expressions.{Add, And, Attribute, AttributeReference, AttributeSet, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Like, Log, Log10, Log1p, Log2, Lower, Multiply, Or, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
import org.apache.spark.sql.types.DataType
object HoodieSpark2CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
+ // NOTE: This method has been borrowed from Spark 3.1
+ override def extractPredicatesWithinOutputSet(condition: Expression,
+ outputSet: AttributeSet): Option[Expression] = condition match {
+ case And(left, right) =>
+ val leftResultOptional = extractPredicatesWithinOutputSet(left, outputSet)
+ val rightResultOptional = extractPredicatesWithinOutputSet(right, outputSet)
+ (leftResultOptional, rightResultOptional) match {
+ case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult))
+ case (Some(leftResult), None) => Some(leftResult)
+ case (None, Some(rightResult)) => Some(rightResult)
+ case _ => None
+ }
+
+ // The Or predicate is convertible when both of its children can be pushed down.
+ // That is to say, if one/both of the children can be partially pushed down, the Or
+ // predicate can be partially pushed down as well.
+ //
+ // Here is an example used to explain the reason.
+ // Let's say we have
+ // condition: (a1 AND a2) OR (b1 AND b2),
+ // outputSet: AttributeSet(a1, b1)
+ // a1 and b1 is convertible, while a2 and b2 is not.
+ // The predicate can be converted as
+ // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+ // As per the logical in And predicate, we can push down (a1 OR b1).
+ case Or(left, right) =>
+ for {
+ lhs <- extractPredicatesWithinOutputSet(left, outputSet)
+ rhs <- extractPredicatesWithinOutputSet(right, outputSet)
+ } yield Or(lhs, rhs)
+
+ // Here we assume all the `Not` operators is already below all the `And` and `Or` operators
+ // after the optimization rule `BooleanSimplification`, so that we don't need to handle the
+ // `Not` operators here.
+ case other =>
+ if (other.references.subsetOf(outputSet)) {
+ Some(other)
+ } else {
+ None
+ }
+ }
+
+ // NOTE: This method has been borrowed from Spark 3.1
+ override def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] = {
+ exprs.map {
+ _.transform {
+ case a: AttributeReference =>
+ a.withName(attributes.find(_.semanticEquals(a)).getOrElse(a).name)
+ }
+ }
+ }
+
override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
expr match {
case OrderPreservingTransformation(attrRef) => Some(attrRef)
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index d537dcef4b4..da53f00e697 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -22,18 +22,20 @@ import org.apache.avro.Schema
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark2HoodieFileScanRDD, Spark2RowSerDe}
+import org.apache.hudi.{AvroConversionUtils, DefaultSource, HoodieBaseRelation, Spark2HoodieFileScanRDD, Spark2RowSerDe}
+import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, LogicalPlan}
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
@@ -128,6 +130,23 @@ class Spark2Adapter extends SparkAdapter {
partitions.toSeq
}
+ override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
+ super.isHoodieTable(table, spark) ||
+ // NOTE: Following checks extending the logic of the base class specifically for Spark 2.x
+ (unfoldSubqueryAliases(table) match {
+ // This is to handle the cases when table is loaded by providing
+ // the path to the Spark DS and not from the catalog
+ //
+ // NOTE: This logic can't be relocated to the hudi-spark-client
+ case LogicalRelation(_: HoodieBaseRelation, _, _, _) => true
+
+ case relation: UnresolvedRelation =>
+ isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
+
+ case _ => false
+ })
+ }
+
override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
}
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystExpressionUtils.scala
new file mode 100644
index 00000000000..b8ddb0a799b
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystExpressionUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Predicate, PredicateHelper}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+trait HoodieSpark3CatalystExpressionUtils extends HoodieCatalystExpressionUtils
+ with PredicateHelper {
+
+ override def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] =
+ DataSourceStrategy.normalizeExprs(exprs, attributes)
+
+ override def extractPredicatesWithinOutputSet(condition: Expression,
+ outputSet: AttributeSet): Option[Expression] =
+ super[PredicateHelper].extractPredicatesWithinOutputSet(condition, outputSet)
+}
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index d0fb7ce6e86..1f82ce260ed 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.adapter
import org.apache.avro.Schema
import org.apache.hadoop.fs.Path
-import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark3RowSerDe}
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.{AvroConversionUtils, DefaultSource, HoodieBaseRelation, Spark3RowSerDe}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -33,10 +33,9 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.hudi.SparkAdapter
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{HoodieSpark3CatalogUtils, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
@@ -74,19 +73,28 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
}
override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
- unfoldSubqueryAliases(table) match {
- case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
- case relation: UnresolvedRelation =>
- try {
- isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
- } catch {
- case NonFatal(e) =>
- logWarning("Failed to determine whether the table is a hoodie table", e)
- false
- }
- case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
- case _=> false
- }
+ super.isHoodieTable(table, spark) ||
+ // NOTE: Following checks extending the logic of the base class specifically for Spark 3.x
+ (unfoldSubqueryAliases(table) match {
+ case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
+ // This is to handle the cases when table is loaded by providing
+ // the path to the Spark DS and not from the catalog
+ //
+ // NOTE: This logic can't be relocated to the hudi-spark-client
+ case LogicalRelation(_: HoodieBaseRelation, _, _, _) => true
+
+ case relation: UnresolvedRelation =>
+ // TODO(HUDI-4503) clean-up try catch
+ try {
+ isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Failed to determine whether the table is a hoodie table", e)
+ false
+ }
+
+ case _ => false
+ })
}
override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala
index 0def10d6e20..d31c6a7b1a2 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala
@@ -18,11 +18,11 @@
package org.apache.spark.sql
-import HoodieSparkTypeUtils.isCastPreservingOrdering
+import org.apache.spark.sql.HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AnsiCast, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
import org.apache.spark.sql.types.DataType
-object HoodieSpark31CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
+object HoodieSpark31CatalystExpressionUtils extends HoodieSpark3CatalystExpressionUtils {
override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
expr match {
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala
index 82e8cfd9b31..52c8de6bf7b 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql
-import HoodieSparkTypeUtils.isCastPreservingOrdering
+import org.apache.spark.sql.HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AnsiCast, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
import org.apache.spark.sql.types.DataType
-object HoodieSpark32CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
+object HoodieSpark32CatalystExpressionUtils extends HoodieSpark3CatalystExpressionUtils {
override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
expr match {
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala
index b540c3ec1c5..d68c9f373fb 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala
@@ -21,7 +21,7 @@ import HoodieSparkTypeUtils.isCastPreservingOrdering
import org.apache.spark.sql.catalyst.expressions.{Add, AnsiCast, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
import org.apache.spark.sql.types.DataType
-object HoodieSpark33CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
+object HoodieSpark33CatalystExpressionUtils extends HoodieSpark3CatalystExpressionUtils {
override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
expr match {