You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/11 17:21:28 UTC

[spark] branch branch-2.4 updated: [SPARK-26576][SQL] Broadcast hint not applied to partitioned table

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new b9eb0e8  [SPARK-26576][SQL] Broadcast hint not applied to partitioned table
b9eb0e8 is described below

commit b9eb0e85de3317a7f4c89a90082f7793b645c6ea
Author: John Zhuge <jz...@apache.org>
AuthorDate: Fri Jan 11 09:21:13 2019 -0800

    [SPARK-26576][SQL] Broadcast hint not applied to partitioned table
    
    ## What changes were proposed in this pull request?
    
    Make sure broadcast hint is applied to partitioned tables.
    
    Since the issue exists in branch 2.0 to 2.4, but not in master, I created this PR for branch-2.4.
    
    ## How was this patch tested?
    
    - A new unit test in PruneFileSourcePartitionsSuite
    - Unit test suites touched by SPARK-14581: JoinOptimizationSuite, FilterPushdownSuite, ColumnPruningSuite, and PruneFiltersSuite
    
    cloud-fan davies rxin
    
    Closes #23507 from jzhuge/SPARK-26576.
    
    Authored-by: John Zhuge <jz...@apache.org>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../apache/spark/sql/catalyst/planning/patterns.scala |  3 ---
 .../execution/PruneFileSourcePartitionsSuite.scala    | 19 ++++++++++++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 84be677..d91b890 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -66,9 +66,6 @@ object PhysicalOperation extends PredicateHelper {
         val substitutedCondition = substitute(aliases)(condition)
         (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases)
 
-      case h: ResolvedHint =>
-        collectProjectsAndFilters(h.child)
-
       case other =>
         (None, Nil, other, Map.empty)
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 9438418..8a9adf7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -17,15 +17,20 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.scalatest.Matchers._
+
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, ResolvedHint}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
+import org.apache.spark.sql.functions.broadcast
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types.StructType
 
@@ -91,4 +96,16 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
       assert(size2 < tableStats.get.sizeInBytes)
     }
   }
+
+  test("SPARK-26576 Broadcast hint not applied to partitioned table") {
+    withTable("tbl") {
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl")
+        val df = spark.table("tbl")
+        val qe = df.join(broadcast(df), "p").queryExecution
+        qe.optimizedPlan.collect { case _: ResolvedHint => } should have size 1
+        qe.sparkPlan.collect { case j: BroadcastHashJoinExec => j } should have size 1
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org