You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/04/20 04:07:25 UTC

spark git commit: [SPARK-23877][SQL] Use filter predicates to prune partitions in metadata-only queries

Repository: spark
Updated Branches:
  refs/heads/master e55953b0b -> b3fde5a41


[SPARK-23877][SQL] Use filter predicates to prune partitions in metadata-only queries

## What changes were proposed in this pull request?

This updates the OptimizeMetadataOnlyQuery rule to use filter expressions when listing partitions, if there are filter nodes in the logical plan. This avoids listing all partitions for large tables on the driver.

This also fixes a minor bug where the partitions returned from fsRelation cannot be serialized without hitting a stack level too deep error. This is caused by serializing a stream to executors, where the stream is a recursive structure. If the stream is too long, the serialization stack reaches the maximum level of depth. The fix is to create a LocalRelation using an Array instead of the incoming Seq.

## How was this patch tested?

Existing tests for metadata-only queries.

Author: Ryan Blue <bl...@apache.org>

Closes #20988 from rdblue/SPARK-23877-metadata-only-push-filters.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3fde5a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3fde5a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3fde5a4

Branch: refs/heads/master
Commit: b3fde5a41ee625141b9d21ce32ea68c082449430
Parents: e55953b
Author: Ryan Blue <bl...@apache.org>
Authored: Fri Apr 20 12:06:41 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Apr 20 12:06:41 2018 +0800

----------------------------------------------------------------------
 .../execution/OptimizeMetadataOnlyQuery.scala   | 94 +++++++++++++-------
 .../OptimizeHiveMetadataOnlyQuerySuite.scala    | 68 ++++++++++++++
 2 files changed, 132 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b3fde5a4/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index dc4aff9..acbd4be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -49,9 +49,9 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
     }
 
     plan.transform {
-      case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(partAttrs, relation)) =>
+      case a @ Aggregate(_, aggExprs, child @ PartitionedRelation(_, attrs, filters, rel)) =>
         // We only apply this optimization when only partitioned attributes are scanned.
-        if (a.references.subsetOf(partAttrs)) {
+        if (a.references.subsetOf(attrs)) {
           val aggFunctions = aggExprs.flatMap(_.collect {
             case agg: AggregateExpression => agg
           })
@@ -67,7 +67,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
             })
           }
           if (isAllDistinctAgg) {
-            a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+            a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, rel, filters)))
           } else {
             a
           }
@@ -98,14 +98,27 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
    */
   private def replaceTableScanWithPartitionMetadata(
       child: LogicalPlan,
-      relation: LogicalPlan): LogicalPlan = {
+      relation: LogicalPlan,
+      partFilters: Seq[Expression]): LogicalPlan = {
+    // this logic comes from PruneFileSourcePartitions. it ensures that the filter names match the
+    // relation's schema. PartitionedRelation ensures that the filters only reference partition cols
+    val relFilters = partFilters.map { e =>
+      e transform {
+        case a: AttributeReference =>
+          a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+      }
+    }
+
     child transform {
       case plan if plan eq relation =>
         relation match {
           case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) =>
             val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-            val partitionData = fsRelation.location.listFiles(Nil, Nil)
-            LocalRelation(partAttrs, partitionData.map(_.values), isStreaming)
+            val partitionData = fsRelation.location.listFiles(relFilters, Nil)
+            // partition data may be a stream, which can cause serialization to hit stack level too
+            // deep exceptions because it is a recursive structure in memory. converting to array
+            // avoids the problem.
+            LocalRelation(partAttrs, partitionData.map(_.values).toArray, isStreaming)
 
           case relation: HiveTableRelation =>
             val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
@@ -113,12 +126,21 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
               CaseInsensitiveMap(relation.tableMeta.storage.properties)
             val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
               .getOrElse(SQLConf.get.sessionLocalTimeZone)
-            val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
+            val partitions = if (partFilters.nonEmpty) {
+              catalog.listPartitionsByFilter(relation.tableMeta.identifier, relFilters)
+            } else {
+              catalog.listPartitions(relation.tableMeta.identifier)
+            }
+
+            val partitionData = partitions.map { p =>
               InternalRow.fromSeq(partAttrs.map { attr =>
                 Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval()
               })
             }
-            LocalRelation(partAttrs, partitionData)
+            // partition data may be a stream, which can cause serialization to hit stack level too
+            // deep exceptions because it is a recursive structure in memory. converting to array
+            // avoids the problem.
+            LocalRelation(partAttrs, partitionData.toArray)
 
           case _ =>
             throw new IllegalStateException(s"unrecognized table scan node: $relation, " +
@@ -129,35 +151,47 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
 
   /**
    * A pattern that finds the partitioned table relation node inside the given plan, and returns a
-   * pair of the partition attributes and the table relation node.
+   * pair of the partition attributes, partition filters, and the table relation node.
    *
    * It keeps traversing down the given plan tree if there is a [[Project]] or [[Filter]] with
    * deterministic expressions, and returns result after reaching the partitioned table relation
    * node.
    */
-  object PartitionedRelation {
-
-    def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match {
-      case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
-        if fsRelation.partitionSchema.nonEmpty =>
-        val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-        Some((AttributeSet(partAttrs), l))
-
-      case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
-        val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
-        Some((AttributeSet(partAttrs), relation))
-
-      case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
-        unapply(child).flatMap { case (partAttrs, relation) =>
-          if (p.references.subsetOf(partAttrs)) Some((p.outputSet, relation)) else None
-        }
+  object PartitionedRelation extends PredicateHelper {
+
+    def unapply(
+        plan: LogicalPlan): Option[(AttributeSet, AttributeSet, Seq[Expression], LogicalPlan)] = {
+      plan match {
+        case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
+          if fsRelation.partitionSchema.nonEmpty =>
+          val partAttrs = AttributeSet(getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l))
+          Some((partAttrs, partAttrs, Nil, l))
+
+        case relation: HiveTableRelation if relation.tableMeta.partitionColumnNames.nonEmpty =>
+          val partAttrs = AttributeSet(
+            getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation))
+          Some((partAttrs, partAttrs, Nil, relation))
+
+        case p @ Project(projectList, child) if projectList.forall(_.deterministic) =>
+          unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
+            if (p.references.subsetOf(attrs)) {
+              Some((partAttrs, p.outputSet, filters, relation))
+            } else {
+              None
+            }
+          }
 
-      case f @ Filter(condition, child) if condition.deterministic =>
-        unapply(child).flatMap { case (partAttrs, relation) =>
-          if (f.references.subsetOf(partAttrs)) Some((partAttrs, relation)) else None
-        }
+        case f @ Filter(condition, child) if condition.deterministic =>
+          unapply(child).flatMap { case (partAttrs, attrs, filters, relation) =>
+            if (f.references.subsetOf(partAttrs)) {
+              Some((partAttrs, attrs, splitConjunctivePredicates(condition) ++ filters, relation))
+            } else {
+              None
+            }
+          }
 
-      case _ => None
+        case _ => None
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b3fde5a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
new file mode 100644
index 0000000..95f192f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/OptimizeHiveMetadataOnlyQuerySuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Filter, Project, SubqueryAlias}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+class OptimizeHiveMetadataOnlyQuerySuite extends QueryTest with TestHiveSingleton
+    with BeforeAndAfter with SQLTestUtils {
+
+  import spark.implicits._
+
+  before {
+    sql("CREATE TABLE metadata_only (id bigint, data string) PARTITIONED BY (part int)")
+    (0 to 10).foreach(p => sql(s"ALTER TABLE metadata_only ADD PARTITION (part=$p)"))
+  }
+
+  test("SPARK-23877: validate metadata-only query pushes filters to metastore") {
+    withTable("metadata_only") {
+      val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
+
+      // verify the number of matching partitions
+      assert(sql("SELECT DISTINCT part FROM metadata_only WHERE part < 5").collect().length === 5)
+
+      // verify that the partition predicate was pushed down to the metastore
+      assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount === 5)
+    }
+  }
+
+  test("SPARK-23877: filter on projected expression") {
+    withTable("metadata_only") {
+      val startCount = HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount
+
+      // verify the matching partitions
+      val partitions = spark.internalCreateDataFrame(Distinct(Filter(($"x" < 5).expr,
+        Project(Seq(($"part" + 1).as("x").expr.asInstanceOf[NamedExpression]),
+          spark.table("metadata_only").logicalPlan.asInstanceOf[SubqueryAlias].child)))
+          .queryExecution.toRdd, StructType(Seq(StructField("x", IntegerType))))
+
+      checkAnswer(partitions, Seq(1, 2, 3, 4).toDF("x"))
+
+      // verify that the partition predicate was not pushed down to the metastore
+      assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount - startCount == 11)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org