You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/04 22:38:21 UTC

[GitHub] [spark] sunchao commented on a change in pull request #32583: [SPARK-35437][SQL] Use expressions to filter Hive partitions at client side

sunchao commented on a change in pull request #32583:
URL: https://github.com/apache/spark/pull/32583#discussion_r721729139



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+    buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+      .doc(s"When true and " +
+        s"we cannot do filtering on the server(${HIVE_METASTORE_PARTITION_PRUNING.key})," +

Review comment:
       nit: add a space at the end

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+    buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+      .doc(s"When true and " +

Review comment:
       nit: `s` is unnecessary

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
##########
@@ -156,29 +157,38 @@ object ExternalCatalogUtils {
     } else {
       val partitionSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
         catalogTable.partitionSchema)
-      val partitionColumnNames = catalogTable.partitionColumnNames.toSet
-
-      val nonPartitionPruningPredicates = predicates.filterNot {
-        _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
-      }
-      if (nonPartitionPruningPredicates.nonEmpty) {
-        throw QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
-          nonPartitionPruningPredicates)
-      }
-
-      val boundPredicate =
-        Predicate.createInterpreted(predicates.reduce(And).transform {
-          case att: AttributeReference =>
-            val index = partitionSchema.indexWhere(_.name == att.name)
-            BoundReference(index, partitionSchema(index).dataType, nullable = true)
-        })
+      val boundPredicate = generatePartitionPredicateByFilter(catalogTable,
+        partitionSchema, predicates)
 
       inputPartitions.filter { p =>
         boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
       }
     }
   }
 
+  def generatePartitionPredicateByFilter(
+      catalogTable: CatalogTable,
+      partitionSchema: StructType,
+      predicates: Seq[Expression]): BasePredicate = {
+    val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+
+    val nonPartitionPruningPredicates = predicates.filterNot {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+    if (nonPartitionPruningPredicates.nonEmpty) {
+      throw QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(

Review comment:
       This looks like an Spark internal error (caused by implementor) instead of a user-facing error, so I think `IllegalStateException` or a precondition check is more appropriate. I know we just inherited from the old codepath so feel free to ignore.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+    buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+      .doc(s"When true and " +
+        s"we cannot do filtering on the server(${HIVE_METASTORE_PARTITION_PRUNING.key})," +
+        "pruning partition by getting the partition names first " +

Review comment:
       suggestion: "Spark will instead prune partitions by getting the partition names first and then evaluate the filter expressions on the client side."

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
##########
@@ -156,29 +157,38 @@ object ExternalCatalogUtils {
     } else {
       val partitionSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
         catalogTable.partitionSchema)
-      val partitionColumnNames = catalogTable.partitionColumnNames.toSet
-
-      val nonPartitionPruningPredicates = predicates.filterNot {
-        _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
-      }
-      if (nonPartitionPruningPredicates.nonEmpty) {
-        throw QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
-          nonPartitionPruningPredicates)
-      }
-
-      val boundPredicate =
-        Predicate.createInterpreted(predicates.reduce(And).transform {
-          case att: AttributeReference =>
-            val index = partitionSchema.indexWhere(_.name == att.name)
-            BoundReference(index, partitionSchema(index).dataType, nullable = true)
-        })
+      val boundPredicate = generatePartitionPredicateByFilter(catalogTable,
+        partitionSchema, predicates)
 
       inputPartitions.filter { p =>
         boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
       }
     }
   }
 
+  def generatePartitionPredicateByFilter(
+      catalogTable: CatalogTable,
+      partitionSchema: StructType,
+      predicates: Seq[Expression]): BasePredicate = {
+    val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+
+    val nonPartitionPruningPredicates = predicates.filterNot {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+    if (nonPartitionPruningPredicates.nonEmpty) {
+      throw QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
+        nonPartitionPruningPredicates)
+    }
+
+    val boundPredicate =

Review comment:
       nit: `boundPredicate` is unnecessary and we can just return from here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org