You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/09 01:14:53 UTC

git commit: [SPARK-3349][SQL] Output partitioning of limit should not be inherited from child

Repository: spark
Updated Branches:
  refs/heads/master 08ce18881 -> 7db53391f


[SPARK-3349][SQL] Output partitioning of limit should not be inherited from child

This resolves https://issues.apache.org/jira/browse/SPARK-3349

Author: Eric Liang <ek...@google.com>

Closes #2262 from ericl/spark-3349 and squashes the following commits:

3e1b05c [Eric Liang] add regression test
ac32723 [Eric Liang] make limit/takeOrdered output SinglePartition


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7db53391
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7db53391
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7db53391

Branch: refs/heads/master
Commit: 7db53391f1b349d1f49844197b34f94806f5e336
Parents: 08ce188
Author: Eric Liang <ek...@google.com>
Authored: Mon Sep 8 16:14:32 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Sep 8 16:14:36 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/basicOperators.scala       |  4 +++-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7db53391/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 47bff0c..cac3766 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
 import org.apache.spark.util.MutablePair
 
 /**
@@ -100,6 +100,7 @@ case class Limit(limit: Int, child: SparkPlan)
   private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
 
   override def output = child.output
+  override def outputPartitioning = SinglePartition
 
   /**
    * A custom implementation modeled after the take function on RDDs but which never runs any job
@@ -173,6 +174,7 @@ case class Limit(limit: Int, child: SparkPlan)
 case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {
 
   override def output = child.output
+  override def outputPartitioning = SinglePartition
 
   val ordering = new RowOrdering(sortOrder, child.output)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7db53391/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1ac2059..e8fbc28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -359,6 +359,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       (null, null, 6, "F") :: Nil)
   }
 
+  test("SPARK-3349 partitioning after limit") {
+    sql("SELECT DISTINCT n FROM lowerCaseData ORDER BY n DESC")
+      .limit(2)
+      .registerTempTable("subset1")
+    sql("SELECT DISTINCT n FROM lowerCaseData")
+      .limit(2)
+      .registerTempTable("subset2")
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData INNER JOIN subset1 ON subset1.n = lowerCaseData.n"),
+      (3, "c", 3) ::
+      (4, "d", 4) :: Nil)
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData INNER JOIN subset2 ON subset2.n = lowerCaseData.n"),
+      (1, "a", 1) ::
+      (2, "b", 2) :: Nil)
+  }
+
   test("mixed-case keywords") {
     checkAnswer(
       sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org