You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/01/20 16:29:37 UTC

[hudi] branch master updated: [HUDI-5384] Adding optimization rule to appropriately push down filters into the `HoodieFileIndex` (#7423)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b1552eff7af [HUDI-5384] Adding optimization rule to appropriately push down filters into the `HoodieFileIndex` (#7423)
b1552eff7af is described below

commit b1552eff7af5fee8f8b11d051a147240ef619689
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Jan 20 08:29:30 2023 -0800

    [HUDI-5384] Adding optimization rule to appropriately push down filters into the `HoodieFileIndex` (#7423)
    
    ### Change Logs
    
    This is a follow-up for https://github.com/apache/hudi/pull/6680
    
    After transitioning of the `HoodieFileIndex` to do file-listing _lazily_ by default, following issue has been uncovered: due to
    
     - Listing now being delayed (until actual execution of the `FileSourceScanExec` node)
     - Spark not providing a generic `Rule` to push-down the predicates (it has `PruneFileSourcePartitions` but it's only applicable to `CatalogFileIndex`)
    
    Statistics (based on the `FileIndex`) for Hudi's relations have been incorrectly estimated due to now these being delayed until the execution time when partition-predicates are pushed-down to `HoodieFileIndex`.
    
    To work this around we're introducing a new `HoodiePruneFileSourcePartitions` rule that is
    
     - Structurally borrowing from `PruneFileSourcePartitions`
     - Pushes down predicates to `HoodieFileIndex` to perform partition-pruning in time, before subsequent CBO stage
     - Addresses the issue of statistics for Hudi's relations being incorrectly estimated
    
    For more details around the impact of `HoodiePruneFileSourcePartitions`, please check out corresponding `TestHoodiePruneFileSourcePartitions`
---
 .../client/common/HoodieSparkEngineContext.java    |  10 +-
 .../spark/sql/HoodieCatalystExpressionUtils.scala  |  38 ++--
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  23 ++-
 .../hudi/testutils/HoodieClientTestHarness.java    |   7 +-
 .../apache/hudi/common/util/TablePathUtils.java    |   4 +
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |   4 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  25 ++-
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |   2 +
 .../sql/hudi/HoodieSparkSessionExtension.scala     |  15 +-
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |  55 +++--
 .../analysis/HoodiePruneFileSourcePartitions.scala | 124 +++++++++++
 .../org/apache/hudi/TestHoodieFileIndex.scala      |  12 +-
 .../hudi/TestNestedSchemaPruningOptimization.scala |   3 +
 .../TestHoodiePruneFileSourcePartitions.scala      | 228 +++++++++++++++++++++
 .../sql/HoodieSpark2CatalystExpressionUtils.scala  |  54 ++++-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  25 ++-
 .../sql/HoodieSpark3CatalystExpressionUtils.scala  |  32 +++
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |  40 ++--
 .../sql/HoodieSpark31CatalystExpressionUtils.scala |   4 +-
 .../sql/HoodieSpark32CatalystExpressionUtils.scala |   4 +-
 .../sql/HoodieSpark33CatalystExpressionUtils.scala |   2 +-
 21 files changed, 623 insertions(+), 88 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index d8281d1a10b..c97cb78d8c9 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -57,15 +57,15 @@ public class HoodieSparkEngineContext extends HoodieEngineContext {
 
   private static final Logger LOG = LogManager.getLogger(HoodieSparkEngineContext.class);
   private final JavaSparkContext javaSparkContext;
-  private SQLContext sqlContext;
+  private final SQLContext sqlContext;
 
   public HoodieSparkEngineContext(JavaSparkContext jsc) {
-    super(new SerializableConfiguration(jsc.hadoopConfiguration()), new SparkTaskContextSupplier());
-    this.javaSparkContext = jsc;
-    this.sqlContext = SQLContext.getOrCreate(jsc.sc());
+    this(jsc, SQLContext.getOrCreate(jsc.sc()));
   }
 
-  public void setSqlContext(SQLContext sqlContext) {
+  public HoodieSparkEngineContext(JavaSparkContext jsc, SQLContext sqlContext) {
+    super(new SerializableConfiguration(jsc.hadoopConfiguration()), new SparkTaskContextSupplier());
+    this.javaSparkContext = jsc;
     this.sqlContext = sqlContext;
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
index 24e50613f50..8a609d7d532 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystExpressionUtils.scala
@@ -19,15 +19,28 @@ package org.apache.spark.sql
 
 import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
-import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateUnsafeProjection}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Like, Literal, MutableProjection, SubqueryExpression, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Like, Literal, SubqueryExpression, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
-import scala.annotation.tailrec
 
 trait HoodieCatalystExpressionUtils {
 
+  /**
+   * Returns a filter that its reference is a subset of `outputSet` and it contains the maximum
+   * constraints from `condition`. This is used for predicate push-down
+   * When there is no such filter, `None` is returned.
+   */
+  def extractPredicatesWithinOutputSet(condition: Expression,
+                                       outputSet: AttributeSet): Option[Expression]
+
+  /**
+   * The attribute name may differ from the one in the schema if the query analyzer
+   * is case insensitive. We should change attribute names to match the ones in the schema,
+   * so we do not need to worry about case sensitivity anymore
+   */
+  def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression]
+
   /**
    * Matches an expression iff
    *
@@ -83,7 +96,7 @@ object HoodieCatalystExpressionUtils {
     val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
     val targetExprs = to.fields.map(f => attrsMap(f.name))
 
-    GenerateUnsafeProjection.generate(targetExprs, attrs)
+    UnsafeProjection.create(targetExprs, attrs)
   }
 
   /**
@@ -109,23 +122,6 @@ object HoodieCatalystExpressionUtils {
     })
   }
 
-  /**
-   * Generates instance of [[MutableProjection]] projecting row of one [[StructType]] into another [[StructType]]
-   *
-   * NOTE: No safety checks are executed to validate that this projection is actually feasible,
-   *       it's up to the caller to make sure that such projection is possible.
-   *
-   * NOTE: Projection of the row from [[StructType]] A to [[StructType]] B is only possible, if
-   *       B is a subset of A
-   */
-  def generateMutableProjection(from: StructType, to: StructType): MutableProjection = {
-    val attrs = from.toAttributes
-    val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
-    val targetExprs = to.fields.map(f => attrsMap(f.name))
-
-    GenerateMutableProjection.generate(targetExprs, attrs)
-  }
-
   /**
    * Parses and resolves expression against the attributes of the given table schema.
    *
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index ca50d82eae9..f4edf7fb942 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -22,21 +22,18 @@ import org.apache.avro.Schema
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.TablePathUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql.{HoodieCatalogUtils, HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SQLContext, SparkSession, SparkSessionExtensions}
+import org.apache.spark.sql._
 import org.apache.spark.storage.StorageLevel
 
 import java.util.Locale
@@ -120,9 +117,15 @@ trait SparkAdapter extends Serializable {
   def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
     unfoldSubqueryAliases(table) match {
       case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
-      case relation: UnresolvedRelation =>
-        isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
-      case _=> false
+      // This is to handle the cases when table is loaded by providing
+      // the path to the Spark DS and not from the catalog
+      case LogicalRelation(fsr: HadoopFsRelation, _, _, _) =>
+        fsr.options.get("path").map { pathStr =>
+          val path = new Path(pathStr)
+          TablePathUtils.isHoodieTablePath(path.getFileSystem(spark.sparkContext.hadoopConfiguration), path)
+        } getOrElse(false)
+
+      case _ => false
     }
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 7cebf894a2a..60dd9d653d8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -200,7 +200,6 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness {
     jsc.setLogLevel("ERROR");
 
     hadoopConf = jsc.hadoopConfiguration();
-    context = new HoodieSparkEngineContext(jsc);
 
     sparkSession = SparkSession.builder()
         .withExtensions(JFunction.toScala(sparkSessionExtensions -> {
@@ -209,7 +208,13 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness {
         }))
         .config(jsc.getConf())
         .getOrCreate();
+
     sqlContext = new SQLContext(sparkSession);
+    context = new HoodieSparkEngineContext(jsc, sqlContext);
+
+    // NOTE: It's important to set Spark's `Tests.IS_TESTING` so that our tests are recognized
+    //       as such by Spark
+    System.setProperty("spark.testing", "true");
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
index 937019db8d0..43f2f83360a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
@@ -48,6 +48,10 @@ public class TablePathUtils {
     }
   }
 
+  public static boolean isHoodieTablePath(FileSystem fs, Path path) {
+    return hasTableMetadataFolder(fs, path);
+  }
+
   public static Option<Path> getTablePath(FileSystem fs, Path path) throws HoodieException, IOException {
     LOG.info("Getting table path from path : " + path);
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 80914337eec..52df84b17f8 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -237,7 +237,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
    * this variable itself is _lazy_ (and have to stay that way) which guarantees that it's not initialized, until
    * it's actually accessed
    */
-  protected lazy val fileIndex: HoodieFileIndex =
+  lazy val fileIndex: HoodieFileIndex =
     HoodieFileIndex(sparkSession, metaClient, Some(tableStructSchema), optParams,
       FileStatusCache.getOrCreate(sparkSession))
 
@@ -289,6 +289,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
       // TODO(HUDI-XXX) internal schema doesn't support nested schema pruning currently
       !hasSchemaOnRead
 
+  override def sizeInBytes: Long = fileIndex.sizeInBytes
+
   override def schema: StructType = {
     // NOTE: Optimizer could prune the schema (applying for ex, [[NestedSchemaPruning]] rule) setting new updated
     //       schema in-place (via [[setPrunedDataSchema]] method), therefore we have to make sure that we pick
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 60f413970a0..003e27233a0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -39,7 +39,9 @@ import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
 import java.text.SimpleDateFormat
+import javax.annotation.concurrent.NotThreadSafe
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
 
@@ -66,6 +68,7 @@ import scala.util.{Failure, Success, Try}
  *
  * TODO rename to HoodieSparkSqlFileIndex
  */
+@NotThreadSafe
 case class HoodieFileIndex(spark: SparkSession,
                            metaClient: HoodieTableMetaClient,
                            schemaSpec: Option[StructType],
@@ -82,6 +85,12 @@ case class HoodieFileIndex(spark: SparkSession,
   )
     with FileIndex {
 
+  @transient private var hasPushedDownPartitionPredicates: Boolean = false
+
+  /**
+   * NOTE: [[ColumnStatsIndexSupport]] is a transient state, since it's only relevant while logical plan
+   *       is handled by the Spark's driver
+   */
   @transient private lazy val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
 
   override def rootPaths: Seq[Path] = getQueryPaths.asScala
@@ -162,6 +171,8 @@ case class HoodieFileIndex(spark: SparkSession,
       s"candidate files after data skipping: $candidateFileSize; " +
       s"skipping percentage $skippingRatio")
 
+    hasPushedDownPartitionPredicates = true
+
     if (shouldReadAsPartitionedTable()) {
       listedPartitions
     } else {
@@ -245,6 +256,7 @@ case class HoodieFileIndex(spark: SparkSession,
   override def refresh(): Unit = {
     super.refresh()
     columnStatsIndex.invalidateCaches()
+    hasPushedDownPartitionPredicates = false
   }
 
   override def inputFiles: Array[String] =
@@ -252,8 +264,11 @@ case class HoodieFileIndex(spark: SparkSession,
 
   override def sizeInBytes: Long = getTotalCachedFilesSize
 
+  def hasPredicatesPushedDown: Boolean =
+    hasPushedDownPartitionPredicates
+
   private def isDataSkippingEnabled: Boolean = getConfigValue(options, spark.sessionState.conf,
-    DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false").toBoolean
+    DataSourceReadOptions.ENABLE_DATA_SKIPPING.key, DataSourceReadOptions.ENABLE_DATA_SKIPPING.defaultValue.toString).toBoolean
 
   private def isMetadataTableEnabled: Boolean = metadataConfig.enabled()
 
@@ -297,6 +312,8 @@ object HoodieFileIndex extends Logging {
     val properties = new TypedProperties()
     properties.putAll(options.filter(p => p._2 != null).asJava)
 
+    // TODO(HUDI-5361) clean up properties carry-over
+
     // To support metadata listing via Spark SQL we allow users to pass the config via SQL Conf in spark session. Users
     // would be able to run SET hoodie.metadata.enable=true in the spark sql session to enable metadata listing.
     val isMetadataTableEnabled = getConfigValue(options, sqlConf, HoodieMetadataConfig.ENABLE.key, null)
@@ -304,6 +321,12 @@ object HoodieFileIndex extends Logging {
       properties.setProperty(HoodieMetadataConfig.ENABLE.key(), String.valueOf(isMetadataTableEnabled))
     }
 
+    val listingModeOverride = getConfigValue(options, sqlConf,
+      DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, null)
+    if (listingModeOverride != null) {
+      properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key, listingModeOverride)
+    }
+
     properties
   }
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 66dfc2c161c..1e7ce03d0b1 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
+import javax.annotation.concurrent.NotThreadSafe
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
@@ -56,6 +57,7 @@ import scala.language.implicitConversions
  * @param specifiedQueryInstant instant as of which table is being queried
  * @param fileStatusCache transient cache of fetched [[FileStatus]]es
  */
+@NotThreadSafe
 class SparkHoodieTableFileIndex(spark: SparkSession,
                                 metaClient: HoodieTableMetaClient,
                                 schemaSpec: Option[StructType],
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
index 4e74cbe084d..0e80aca505c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
@@ -32,10 +32,6 @@ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
       new HoodieCommonSqlParser(session, parser)
     }
 
-    HoodieAnalysis.customOptimizerRules.foreach { ruleBuilder =>
-      extensions.injectOptimizerRule(ruleBuilder(_))
-    }
-
     HoodieAnalysis.customResolutionRules.foreach { ruleBuilder =>
       extensions.injectResolutionRule(ruleBuilder(_))
     }
@@ -44,6 +40,17 @@ class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit)
       extensions.injectPostHocResolutionRule(ruleBuilder(_))
     }
 
+    HoodieAnalysis.customOptimizerRules.foreach { ruleBuilder =>
+      extensions.injectOptimizerRule(ruleBuilder(_))
+    }
+
+    /*
+    // CBO is only supported in Spark >= 3.1.x
+    HoodieAnalysis.customPreCBORules.foreach { ruleBuilder =>
+      extensions.injectPreCBORule(ruleBuilder(_))
+    }
+    */
+
     sparkAdapter.injectTableFunctions(extensions)
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index e19f0b39f93..858d914d7b5 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -44,25 +44,6 @@ import scala.collection.mutable.ListBuffer
 object HoodieAnalysis {
   type RuleBuilder = SparkSession => Rule[LogicalPlan]
 
-  def customOptimizerRules: Seq[RuleBuilder] = {
-    if (HoodieSparkUtils.gteqSpark3_1) {
-      val nestedSchemaPruningClass =
-        if (HoodieSparkUtils.gteqSpark3_3) {
-          "org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
-        } else if (HoodieSparkUtils.gteqSpark3_2) {
-          "org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
-        } else {
-          // spark 3.1
-          "org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
-        }
-
-      val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
-      Seq(_ => nestedSchemaPruningRule)
-    } else {
-      Seq.empty
-    }
-  }
-
   def customResolutionRules: Seq[RuleBuilder] = {
     val rules: ListBuffer[RuleBuilder] = ListBuffer(
       // Default rules
@@ -120,6 +101,42 @@ object HoodieAnalysis {
     rules
   }
 
+  def customOptimizerRules: Seq[RuleBuilder] = {
+    val optimizerRules = ListBuffer[RuleBuilder]()
+    if (HoodieSparkUtils.gteqSpark3_1) {
+      val nestedSchemaPruningClass =
+        if (HoodieSparkUtils.gteqSpark3_3) {
+          "org.apache.spark.sql.execution.datasources.Spark33NestedSchemaPruning"
+        } else if (HoodieSparkUtils.gteqSpark3_2) {
+          "org.apache.spark.sql.execution.datasources.Spark32NestedSchemaPruning"
+        } else {
+          // spark 3.1
+          "org.apache.spark.sql.execution.datasources.Spark31NestedSchemaPruning"
+        }
+
+      val nestedSchemaPruningRule = ReflectionUtils.loadClass(nestedSchemaPruningClass).asInstanceOf[Rule[LogicalPlan]]
+      // TODO(HUDI-5443) re-enable
+      //optimizerRules += (_ => nestedSchemaPruningRule)
+    }
+
+    // NOTE: [[HoodiePruneFileSourcePartitions]] is a replica in kind to Spark's
+    //       [[PruneFileSourcePartitions]] and as such should be executed at the same stage.
+    //       However, currently Spark doesn't allow [[SparkSessionExtensions]] to inject into
+    //       [[BaseSessionStateBuilder.customEarlyScanPushDownRules]] even though it could directly
+    //       inject into the Spark's [[Optimizer]]
+    //
+    //       To work this around, we injecting this as the rule that trails pre-CBO, ie it's
+    //          - Triggered before CBO, therefore have access to the same stats as CBO
+    //          - Precedes actual [[customEarlyScanPushDownRules]] invocation
+    optimizerRules += (spark => HoodiePruneFileSourcePartitions(spark))
+
+    optimizerRules
+  }
+
+  /*
+  // CBO is only supported in Spark >= 3.1.x
+  def customPreCBORules: Seq[RuleBuilder] = Seq()
+  */
 }
 
 /**
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
new file mode 100644
index 00000000000..3b86777e16e
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression, PredicateHelper, SubqueryExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.hudi.analysis.HoodiePruneFileSourcePartitions.{HoodieRelationMatcher, exprUtils, getPartitionFiltersAndDataFilters, rebuildPhysicalOperation}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Prune the partitions of Hudi table based relations by the means of pushing down the
+ * partition filters
+ *
+ * NOTE: [[HoodiePruneFileSourcePartitions]] is a replica in kind to Spark's [[PruneFileSourcePartitions]]
+ */
+case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    case op @ PhysicalOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
+      if sparkAdapter.isHoodieTable(lr, spark) && fileIndex.partitionSchema.nonEmpty && !fileIndex.hasPredicatesPushedDown =>
+
+      val deterministicFilters = filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f))
+      val normalizedFilters = exprUtils.normalizeExprs(deterministicFilters, lr.output)
+
+      val (partitionPruningFilters, _) =
+        getPartitionFiltersAndDataFilters(fileIndex.partitionSchema, normalizedFilters)
+
+      // [[HudiFileIndex]] is a caching one, therefore we don't need to reconstruct new relation,
+      // instead we simply just refresh the index and update the stats
+      fileIndex.listFiles(partitionPruningFilters, Seq())
+
+      if (partitionPruningFilters.nonEmpty) {
+        // Change table stats based on the sizeInBytes of pruned files
+        val filteredStats = FilterEstimation(Filter(partitionPruningFilters.reduce(And), lr)).estimate
+        val colStats = filteredStats.map {
+          _.attributeStats.map { case (attr, colStat) =>
+            (attr.name, colStat.toCatalogColumnStat(attr.name, attr.dataType))
+          }
+        }
+
+        val tableWithStats = lr.catalogTable.map(_.copy(
+          stats = Some(
+            CatalogStatistics(
+              sizeInBytes = BigInt(fileIndex.sizeInBytes),
+              rowCount = filteredStats.flatMap(_.rowCount),
+              colStats = colStats.getOrElse(Map.empty)))
+        ))
+
+        val prunedLogicalRelation = lr.copy(catalogTable = tableWithStats)
+        // Keep partition-pruning predicates so that they are visible in physical planning
+        rebuildPhysicalOperation(projects, filters, prunedLogicalRelation)
+      } else {
+        op
+      }
+  }
+
+}
+
+private object HoodiePruneFileSourcePartitions extends PredicateHelper {
+
+  private val exprUtils = sparkAdapter.getCatalystExpressionUtils
+
+  private object HoodieRelationMatcher {
+    def unapply(relation: BaseRelation): Option[HoodieFileIndex] = relation match {
+      case HadoopFsRelation(fileIndex: HoodieFileIndex, _, _, _, _, _) => Some(fileIndex)
+      case r: HoodieBaseRelation => Some(r.fileIndex)
+      case _ => None
+    }
+  }
+
+  private def rebuildPhysicalOperation(projects: Seq[NamedExpression],
+                                       filters: Seq[Expression],
+                                       relation: LeafNode): Project = {
+    val withFilter = if (filters.nonEmpty) {
+      val filterExpression = filters.reduceLeft(And)
+      Filter(filterExpression, relation)
+    } else {
+      relation
+    }
+    Project(projects, withFilter)
+  }
+
+  def getPartitionFiltersAndDataFilters(partitionSchema: StructType,
+                                        normalizedFilters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
+    val partitionColumns = normalizedFilters.flatMap { expr =>
+      expr.collect {
+        case attr: AttributeReference if partitionSchema.names.contains(attr.name) =>
+          attr
+      }
+    }
+    val partitionSet = AttributeSet(partitionColumns)
+    val (partitionFilters, dataFilters) = normalizedFilters.partition(f =>
+      f.references.subsetOf(partitionSet)
+    )
+    val extraPartitionFilter =
+      dataFilters.flatMap(exprUtils.extractPredicatesWithinOutputSet(_, partitionSet))
+    (ExpressionSet(partitionFilters ++ extraPartitionFilter).toSeq, dataFilters)
+  }
+
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
index a0a060d4e28..0f6ae1b16e6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
 import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.DataSourceReadOptions.{FILE_INDEX_LISTING_MODE_EAGER, FILE_INDEX_LISTING_MODE_LAZY, QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
 import org.apache.hudi.client.HoodieJavaWriteClient
 import org.apache.hudi.client.common.HoodieJavaEngineContext
@@ -39,17 +40,20 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
 import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.util.JFunction
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
 import org.apache.spark.sql.execution.datasources.{NoopCache, PartitionDirectory}
 import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
 import org.apache.spark.sql.types.{IntegerType, StringType}
-import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{DataFrameWriter, Row, SaveMode, SparkSession, SparkSessionExtensions}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, CsvSource, MethodSource, ValueSource}
 
 import java.util.Properties
+import java.util.function.Consumer
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.util.Random
@@ -71,6 +75,12 @@ class TestHoodieFileIndex extends HoodieClientTestBase with ScalaAssertionSuppor
     DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL
   )
 
+  override def getSparkSessionExtensionsInjector: org.apache.hudi.common.util.Option[Consumer[SparkSessionExtensions]] =
+    toJavaOption(
+      Some(
+        JFunction.toJavaConsumer((receiver: SparkSessionExtensions) =>
+          new HoodieSparkSessionExtension().apply(receiver))))
+
   @BeforeEach
   override def setUp() {
     setTableName("hoodie_test")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
index 87d19d31d96..8c631f9bc2b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestNestedSchemaPruningOptimization.scala
@@ -37,6 +37,8 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp
   private def executePlan(plan: LogicalPlan): SparkPlan =
     spark.sessionState.executePlan(plan).executedPlan
 
+  // TODO(HUDI-5443) re-enable
+  /*
   test("Test NestedSchemaPruning optimization (COW/MOR)") {
     withTempDir { tmp =>
       // NOTE: This tests are only relevant for Spark >= 3.1
@@ -117,5 +119,6 @@ class TestNestedSchemaPruningOptimization extends HoodieSparkSqlTestBase with Sp
       }
     }
   }
+  */
 
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
new file mode 100644
index 00000000000..36540b43a40
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.ScalaAssertionSupport
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.util.JFunction
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, IsNotNull, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Assertions, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import java.util.function.Consumer
+
+class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with ScalaAssertionSupport {
+
+  private var spark: SparkSession = _
+
+  @BeforeEach
+  override def setUp() {
+    setTableName("hoodie_test")
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+  }
+
+  override def getSparkSessionExtensionsInjector: org.apache.hudi.common.util.Option[Consumer[SparkSessionExtensions]] =
+    toJavaOption(
+      Some(
+        JFunction.toJavaConsumer((receiver: SparkSessionExtensions) => new HoodieSparkSessionExtension().apply(receiver)))
+    )
+
+  @ParameterizedTest
+  @CsvSource(value = Array("cow", "mor"))
+  def testPartitionFiltersPushDown(tableType: String): Unit = {
+    spark.sql(
+      s"""
+         |CREATE TABLE $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long,
+         |  partition string
+         |) USING hudi
+         |PARTITIONED BY (partition)
+         |TBLPROPERTIES (
+         |  type = '$tableType',
+         |  primaryKey = 'id',
+         |  preCombineField = 'ts'
+         |)
+         |LOCATION '$basePath/$tableName'
+         """.stripMargin)
+
+    spark.sql(
+      s"""
+         |INSERT INTO $tableName VALUES
+         |  (1, 'a1', 10, 1000, "2021-01-05"),
+         |  (2, 'a2', 20, 2000, "2021-01-06"),
+         |  (3, 'a3', 30, 3000, "2021-01-07")
+         """.stripMargin)
+
+    Seq("eager", "lazy").foreach { listingModeOverride =>
+      // We need to refresh the table to make sure Spark is re-processing the query every time
+      // instead of serving already cached value
+      spark.sessionState.catalog.invalidateAllCachedTables()
+
+      spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
+
+      val df = spark.sql(s"SELECT * FROM $tableName WHERE partition = '2021-01-05'")
+      val optimizedPlan = df.queryExecution.optimizedPlan
+
+      optimizedPlan match {
+        case f @ Filter(And(IsNotNull(_), EqualTo(attr: AttributeReference, Literal(value, StringType))), lr: LogicalRelation)
+          if attr.name == "partition" && value.toString.equals("2021-01-05") =>
+
+          listingModeOverride match {
+            // Case #1: Eager listing (fallback mode).
+            //          Whole table will be _listed_ before partition-pruning would be applied. This is default
+            //          Spark behavior that naturally occurs, since predicate push-down for tables w/o appropriate catalog
+            //          support (for partition-pruning) will only occur during execution phase, while file-listing
+            //          actually happens during analysis stage
+            case "eager" =>
+              assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
+              assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+
+            // Case #2: Lazy listing (default mode).
+            //          In case of lazy listing mode, Hudi will only list partitions matching partition-predicates that are
+            //          eagerly pushed down (w/ help of [[HoodiePruneFileSourcePartitions]]) avoiding the necessity to
+            //          list the whole table
+            case "lazy" =>
+              assertEquals(425, f.stats.sizeInBytes.longValue() / 1024)
+              assertEquals(425, lr.stats.sizeInBytes.longValue() / 1024)
+
+            case _ => throw new UnsupportedOperationException()
+          }
+
+          val executionPlan = df.queryExecution.executedPlan
+          val expectedPhysicalPlanPartitionFiltersClause = tableType match {
+            case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
+            case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
+          }
+
+          Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+
+        case _ =>
+          val failureHint =
+            s"""Expected to see plan like below:
+               |```
+               |== Optimized Logical Plan ==
+               |Filter (isnotnull(partition#74) AND (partition#74 = 2021-01-05)), Statistics(sizeInBytes=...)
+               |+- Relation default.hoodie_test[_hoodie_commit_time#65,_hoodie_commit_seqno#66,_hoodie_record_key#67,_hoodie_partition_path#68,_hoodie_file_name#69,id#70,name#71,price#72,ts#73L,partition#74] parquet, Statistics(sizeInBytes=...)
+               |```
+               |
+               |Instead got
+               |```
+               |$optimizedPlan
+               |```
+               |""".stripMargin.trim
+
+          fail(failureHint)
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @CsvSource(value = Array("cow", "mor"))
+  def testEmptyPartitionFiltersPushDown(tableType: String): Unit = {
+    spark.sql(
+      s"""
+         |CREATE TABLE $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long,
+         |  partition string
+         |) USING hudi
+         |PARTITIONED BY (partition)
+         |TBLPROPERTIES (
+         |  type = '$tableType',
+         |  primaryKey = 'id',
+         |  preCombineField = 'ts'
+         |)
+         |LOCATION '$basePath/$tableName'
+         """.stripMargin)
+
+    spark.sql(
+      s"""
+         |INSERT INTO $tableName VALUES
+         |  (1, 'a1', 10, 1000, "2021-01-05"),
+         |  (2, 'a2', 20, 2000, "2021-01-06"),
+         |  (3, 'a3', 30, 3000, "2021-01-07")
+         """.stripMargin)
+
+    Seq("eager", "lazy").foreach { listingModeOverride =>
+      // We need to refresh the table to make sure Spark is re-processing the query every time
+      // instead of serving already cached value
+      spark.sessionState.catalog.invalidateAllCachedTables()
+
+      spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
+
+      val df = spark.sql(s"SELECT * FROM $tableName")
+      val optimizedPlan = df.queryExecution.optimizedPlan
+
+      optimizedPlan match {
+        case lr: LogicalRelation  =>
+
+          // When there are no partition pruning predicates pushed down in both cases of lazy/eager listing the whole
+          // table have to be listed
+          listingModeOverride match {
+            case "eager" | "lazy" =>
+              assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+
+            case _ => throw new UnsupportedOperationException()
+          }
+
+          val executionPlan = df.queryExecution.executedPlan
+          val expectedPhysicalPlanPartitionFiltersClause = tableType match {
+            case "cow" => s"PartitionFilters: []"
+            case "mor" => s"PushedFilters: []"
+          }
+
+          Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+
+        case _ =>
+          val failureHint =
+            s"""Expected to see plan like below:
+               |```
+               |== Optimized Logical Plan ==
+               |Filter (isnotnull(partition#74) AND (partition#74 = 2021-01-05)), Statistics(sizeInBytes=...)
+               |+- Relation default.hoodie_test[_hoodie_commit_time#65,_hoodie_commit_seqno#66,_hoodie_record_key#67,_hoodie_partition_path#68,_hoodie_file_name#69,id#70,name#71,price#72,ts#73L,partition#74] parquet, Statistics(sizeInBytes=...)
+               |```
+               |
+               |Instead got
+               |```
+               |$optimizedPlan
+               |```
+               |""".stripMargin.trim
+
+          fail(failureHint)
+      }
+    }
+  }
+
+
+}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
index 854adcff6ab..92ccf02cab6 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystExpressionUtils.scala
@@ -18,11 +18,63 @@
 package org.apache.spark.sql
 
 import HoodieSparkTypeUtils.isCastPreservingOrdering
-import org.apache.spark.sql.catalyst.expressions.{Add, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Like, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
+import org.apache.spark.sql.catalyst.expressions.{Add, And, Attribute, AttributeReference, AttributeSet, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Like, Log, Log10, Log1p, Log2, Lower, Multiply, Or, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
 import org.apache.spark.sql.types.DataType
 
 object HoodieSpark2CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
 
+  // NOTE: This method has been borrowed from Spark 3.1
+  override def extractPredicatesWithinOutputSet(condition: Expression,
+                                                outputSet: AttributeSet): Option[Expression] = condition match {
+    case And(left, right) =>
+      val leftResultOptional = extractPredicatesWithinOutputSet(left, outputSet)
+      val rightResultOptional = extractPredicatesWithinOutputSet(right, outputSet)
+      (leftResultOptional, rightResultOptional) match {
+        case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult))
+        case (Some(leftResult), None) => Some(leftResult)
+        case (None, Some(rightResult)) => Some(rightResult)
+        case _ => None
+      }
+
+    // The Or predicate is convertible when both of its children can be pushed down.
+    // That is to say, if one/both of the children can be partially pushed down, the Or
+    // predicate can be partially pushed down as well.
+    //
+    // Here is an example used to explain the reason.
+    // Let's say we have
+    // condition: (a1 AND a2) OR (b1 AND b2),
+    // outputSet: AttributeSet(a1, b1)
+    // a1 and b1 is convertible, while a2 and b2 is not.
+    // The predicate can be converted as
+    // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+    // As per the logical in And predicate, we can push down (a1 OR b1).
+    case Or(left, right) =>
+      for {
+        lhs <- extractPredicatesWithinOutputSet(left, outputSet)
+        rhs <- extractPredicatesWithinOutputSet(right, outputSet)
+      } yield Or(lhs, rhs)
+
+    // Here we assume all the `Not` operators is already below all the `And` and `Or` operators
+    // after the optimization rule `BooleanSimplification`, so that we don't need to handle the
+    // `Not` operators here.
+    case other =>
+      if (other.references.subsetOf(outputSet)) {
+        Some(other)
+      } else {
+        None
+      }
+  }
+
+  // NOTE: This method has been borrowed from Spark 3.1
+  override def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] = {
+    exprs.map {
+      _.transform {
+        case a: AttributeReference =>
+          a.withName(attributes.find(_.semanticEquals(a)).getOrElse(a).name)
+      }
+    }
+  }
+
   override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
     expr match {
       case OrderPreservingTransformation(attrRef) => Some(attrRef)
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index d537dcef4b4..da53f00e697 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -22,18 +22,20 @@ import org.apache.avro.Schema
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark2HoodieFileScanRDD, Spark2RowSerDe}
+import org.apache.hudi.{AvroConversionUtils, DefaultSource, HoodieBaseRelation, Spark2HoodieFileScanRDD, Spark2RowSerDe}
+import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, LogicalPlan}
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Spark24HoodieParquetFileFormat}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
@@ -128,6 +130,23 @@ class Spark2Adapter extends SparkAdapter {
     partitions.toSeq
   }
 
+  override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
+    super.isHoodieTable(table, spark) ||
+      // NOTE: Following checks extending the logic of the base class specifically for Spark 2.x
+      (unfoldSubqueryAliases(table) match {
+        // This is to handle the cases when table is loaded by providing
+        // the path to the Spark DS and not from the catalog
+        //
+        // NOTE: This logic can't be relocated to the hudi-spark-client
+        case LogicalRelation(_: HoodieBaseRelation, _, _, _) => true
+
+        case relation: UnresolvedRelation =>
+          isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
+
+        case _ => false
+      })
+  }
+
   override def createHoodieParquetFileFormat(appendPartitionValues: Boolean): Option[ParquetFileFormat] = {
     Some(new Spark24HoodieParquetFileFormat(appendPartitionValues))
   }
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystExpressionUtils.scala
new file mode 100644
index 00000000000..b8ddb0a799b
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystExpressionUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Predicate, PredicateHelper}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+trait HoodieSpark3CatalystExpressionUtils extends HoodieCatalystExpressionUtils
+  with PredicateHelper {
+
+  override def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): Seq[Expression] =
+    DataSourceStrategy.normalizeExprs(exprs, attributes)
+
+  override def extractPredicatesWithinOutputSet(condition: Expression,
+                                                outputSet: AttributeSet): Option[Expression] =
+    super[PredicateHelper].extractPredicatesWithinOutputSet(condition, outputSet)
+}
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index d0fb7ce6e86..1f82ce260ed 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark3RowSerDe}
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.{AvroConversionUtils, DefaultSource, HoodieBaseRelation, Spark3RowSerDe}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -33,10 +33,9 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.hudi.SparkAdapter
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{HoodieSpark3CatalogUtils, Row, SQLContext, SparkSession}
+import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, SparkSession}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
 
@@ -74,19 +73,28 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
   }
 
   override def isHoodieTable(table: LogicalPlan, spark: SparkSession): Boolean = {
-    unfoldSubqueryAliases(table) match {
-      case LogicalRelation(_, _, Some(table), _) => isHoodieTable(table)
-      case relation: UnresolvedRelation =>
-        try {
-          isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
-        } catch {
-          case NonFatal(e) =>
-            logWarning("Failed to determine whether the table is a hoodie table", e)
-            false
-        }
-      case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
-      case _=> false
-    }
+    super.isHoodieTable(table, spark) ||
+      // NOTE: Following checks extending the logic of the base class specifically for Spark 3.x
+      (unfoldSubqueryAliases(table) match {
+        case DataSourceV2Relation(table: Table, _, _, _, _) => isHoodieTable(table.properties())
+        // This is to handle the cases when table is loaded by providing
+        // the path to the Spark DS and not from the catalog
+        //
+        // NOTE: This logic can't be relocated to the hudi-spark-client
+        case LogicalRelation(_: HoodieBaseRelation, _, _, _) => true
+
+        case relation: UnresolvedRelation =>
+          // TODO(HUDI-4503) clean-up try catch
+          try {
+            isHoodieTable(getCatalystPlanUtils.toTableIdentifier(relation), spark)
+          } catch {
+            case NonFatal(e) =>
+              logWarning("Failed to determine whether the table is a hoodie table", e)
+              false
+          }
+
+        case _ => false
+      })
   }
 
   override def createInterpretedPredicate(e: Expression): InterpretedPredicate = {
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala
index 0def10d6e20..d31c6a7b1a2 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystExpressionUtils.scala
@@ -18,11 +18,11 @@
 
 package org.apache.spark.sql
 
-import HoodieSparkTypeUtils.isCastPreservingOrdering
+import org.apache.spark.sql.HoodieSparkTypeUtils.isCastPreservingOrdering
 import org.apache.spark.sql.catalyst.expressions.{Add, AnsiCast, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
 import org.apache.spark.sql.types.DataType
 
-object HoodieSpark31CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
+object HoodieSpark31CatalystExpressionUtils extends HoodieSpark3CatalystExpressionUtils {
 
   override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
     expr match {
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala
index 82e8cfd9b31..52c8de6bf7b 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystExpressionUtils.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql
 
-import HoodieSparkTypeUtils.isCastPreservingOrdering
+import org.apache.spark.sql.HoodieSparkTypeUtils.isCastPreservingOrdering
 import org.apache.spark.sql.catalyst.expressions.{Add, AnsiCast, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
 import org.apache.spark.sql.types.DataType
 
-object HoodieSpark32CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
+object HoodieSpark32CatalystExpressionUtils extends HoodieSpark3CatalystExpressionUtils {
 
   override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
     expr match {
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala
index b540c3ec1c5..d68c9f373fb 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystExpressionUtils.scala
@@ -21,7 +21,7 @@ import HoodieSparkTypeUtils.isCastPreservingOrdering
 import org.apache.spark.sql.catalyst.expressions.{Add, AnsiCast, AttributeReference, BitwiseOr, Cast, DateAdd, DateDiff, DateFormatClass, DateSub, Divide, Exp, Expm1, Expression, FromUTCTimestamp, FromUnixTime, Log, Log10, Log1p, Log2, Lower, Multiply, ParseToDate, ParseToTimestamp, ShiftLeft, ShiftRight, ToUTCTimestamp, ToUnixTimestamp, Upper}
 import org.apache.spark.sql.types.DataType
 
-object HoodieSpark33CatalystExpressionUtils extends HoodieCatalystExpressionUtils {
+object HoodieSpark33CatalystExpressionUtils extends HoodieSpark3CatalystExpressionUtils {
 
   override def tryMatchAttributeOrderingPreservingTransformation(expr: Expression): Option[AttributeReference] = {
     expr match {