You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/14 09:23:31 UTC

spark git commit: [SPARK-7595] [SQL] Window will cause resolve failed with self join

Repository: spark
Updated Branches:
  refs/heads/master d3db2fd66 -> 13e652b61


[SPARK-7595] [SQL] Window will cause resolve failed with self join

for example:
table: src(key string, value string)
sql: with v1 as(select key, count(value) over (partition by key) cnt_val from src), v2 as(select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key) select * from v2 limit 5;
then will analyze fail when resolving conflicting references in Join:
'Limit 5
 'Project [*]
  'Subquery v2
   'Project ['v1.key,'v1_lag.cnt_val]
    'Filter ('v1.key = 'v1_lag.key)
     'Join Inner, None
      Subquery v1
       Project [key#95,cnt_val#94L]
        Window [key#95,value#96], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount(value#96) WindowSpecDefinition [key#95], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS cnt_val#94L], WindowSpecDefinition [key#95], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
         Project [key#95,value#96]
          MetastoreRelation default, src, None
      Subquery v1_lag
       Subquery v1
        Project [key#97,cnt_val#94L]
         Window [key#97,value#98], [HiveWindowFunction#org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount(value#98) WindowSpecDefinition [key#97], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS cnt_val#94L], WindowSpecDefinition [key#97], [], ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
          Project [key#97,value#98]
           MetastoreRelation default, src, None

Conflicting attributes: cnt_val#94L

Author: linweizhong <li...@huawei.com>

Closes #6114 from Sephiroth-Lin/spark-7595 and squashes the following commits:

f8f2637 [linweizhong] Add unit test
dfe9169 [linweizhong] Handle windowExpression with self join


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13e652b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13e652b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13e652b6

Branch: refs/heads/master
Commit: 13e652b61a81b2d2e94088006fbd5fd4ed383e3d
Parents: d3db2fd
Author: linweizhong <li...@huawei.com>
Authored: Thu May 14 00:23:27 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu May 14 00:23:27 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala |  5 +++++
 .../apache/spark/sql/hive/execution/SQLQuerySuite.scala   | 10 ++++++++++
 2 files changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13e652b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a4c6114..4baeeb5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -322,6 +322,11 @@ class Analyzer(
           case oldVersion @ Aggregate(_, aggregateExpressions, _)
               if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
             (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions)))
+
+          case oldVersion @ Window(_, windowExpressions, _, child)
+              if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
+                .nonEmpty =>
+            (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
         }.headOption.getOrElse { // Only handle first case, others will be fixed on the next pass.
           sys.error(
             s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/13e652b6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index eaa9d6a..5c7152e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -763,4 +763,14 @@ class SQLQuerySuite extends QueryTest {
       sql("SELECT CASE k WHEN 2 THEN 22 WHEN 4 THEN 44 ELSE 0 END, v FROM t"),
       Row(0, "1") :: Row(22, "2") :: Row(0, "3") :: Row(44, "4") :: Row(0, "5") :: Nil)
   }
+
+  test("SPARK-7595: Window will cause resolve failed with self join") {
+    checkAnswer(sql(
+      """
+        |with
+        | v1 as (select key, count(value) over (partition by key) cnt_val from src),
+        | v2 as (select v1.key, v1_lag.cnt_val from v1, v1 v1_lag where v1.key = v1_lag.key)
+        | select * from v2 order by key limit 1
+      """.stripMargin), Row(0, 3))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org