You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/04/13 03:00:28 UTC

spark git commit: [SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryAlias issue

Repository: spark
Updated Branches:
  refs/heads/master 682002b6d -> 14291b061


[SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryAlias issue

## What changes were proposed in this pull request?

Current SS continuous doesn't support processing on temp table or `df.as("xxx")`, SS will throw an exception as LogicalPlan not supported, details described in [here](https://issues.apache.org/jira/browse/SPARK-23748).

So here propose to add this support.

## How was this patch tested?

new UT.

Author: jerryshao <ss...@hortonworks.com>

Closes #21017 from jerryshao/SPARK-23748.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14291b06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14291b06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14291b06

Branch: refs/heads/master
Commit: 14291b061b9b40eadbf4ed442f9a5021b8e09597
Parents: 682002b
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Apr 12 20:00:25 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Apr 12 20:00:25 2018 -0700

----------------------------------------------------------------------
 .../analysis/UnsupportedOperationChecker.scala   |  2 +-
 .../streaming/continuous/ContinuousSuite.scala   | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14291b06/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index b55043c..ff9d6d7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -345,7 +345,7 @@ object UnsupportedOperationChecker {
     plan.foreachUp { implicit subPlan =>
       subPlan match {
         case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
-              _: DeserializeToObject | _: SerializeFromObject) =>
+              _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias) =>
         case node if node.nodeName == "StreamingRelationV2" =>
         case node =>
           throwError(s"Continuous processing does not support ${node.nodeName} operations.")

http://git-wip-us.apache.org/repos/asf/spark/blob/14291b06/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index f5884b9..ef74efe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -171,6 +171,25 @@ class ContinuousSuite extends ContinuousSuiteBase {
       "Continuous processing does not support current time operations."))
   }
 
+  test("subquery alias") {
+    val df = spark.readStream
+      .format("rate")
+      .option("numPartitions", "5")
+      .option("rowsPerSecond", "5")
+      .load()
+      .createOrReplaceTempView("rate")
+    val test = spark.sql("select value from rate where value > 5")
+
+    testStream(test, useV2Sink = true)(
+      StartStream(longContinuousTrigger),
+      AwaitEpoch(0),
+      Execute(waitForRateSourceTriggers(_, 2)),
+      IncrementEpoch(),
+      Execute(waitForRateSourceTriggers(_, 4)),
+      IncrementEpoch(),
+      CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
+  }
+
   test("repeatedly restart") {
     val df = spark.readStream
       .format("rate")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org