You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/04/13 03:00:28 UTC
spark git commit: [SPARK-23748][SS] Fix SS continuous process doesn't
support SubqueryAlias issue
Repository: spark
Updated Branches:
refs/heads/master 682002b6d -> 14291b061
[SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryAlias issue
## What changes were proposed in this pull request?
Current SS continuous doesn't support processing on temp table or `df.as("xxx")`, SS will throw an exception as LogicalPlan not supported, details described in [here](https://issues.apache.org/jira/browse/SPARK-23748).
So here propose to add this support.
## How was this patch tested?
new UT.
Author: jerryshao <ss...@hortonworks.com>
Closes #21017 from jerryshao/SPARK-23748.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14291b06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14291b06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14291b06
Branch: refs/heads/master
Commit: 14291b061b9b40eadbf4ed442f9a5021b8e09597
Parents: 682002b
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Apr 12 20:00:25 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Apr 12 20:00:25 2018 -0700
----------------------------------------------------------------------
.../analysis/UnsupportedOperationChecker.scala | 2 +-
.../streaming/continuous/ContinuousSuite.scala | 19 +++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/14291b06/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index b55043c..ff9d6d7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -345,7 +345,7 @@ object UnsupportedOperationChecker {
plan.foreachUp { implicit subPlan =>
subPlan match {
case (_: Project | _: Filter | _: MapElements | _: MapPartitions |
- _: DeserializeToObject | _: SerializeFromObject) =>
+ _: DeserializeToObject | _: SerializeFromObject | _: SubqueryAlias) =>
case node if node.nodeName == "StreamingRelationV2" =>
case node =>
throwError(s"Continuous processing does not support ${node.nodeName} operations.")
http://git-wip-us.apache.org/repos/asf/spark/blob/14291b06/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index f5884b9..ef74efe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -171,6 +171,25 @@ class ContinuousSuite extends ContinuousSuiteBase {
"Continuous processing does not support current time operations."))
}
+ test("subquery alias") {
+ val df = spark.readStream
+ .format("rate")
+ .option("numPartitions", "5")
+ .option("rowsPerSecond", "5")
+ .load()
+ .createOrReplaceTempView("rate")
+ val test = spark.sql("select value from rate where value > 5")
+
+ testStream(test, useV2Sink = true)(
+ StartStream(longContinuousTrigger),
+ AwaitEpoch(0),
+ Execute(waitForRateSourceTriggers(_, 2)),
+ IncrementEpoch(),
+ Execute(waitForRateSourceTriggers(_, 4)),
+ IncrementEpoch(),
+ CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
+ }
+
test("repeatedly restart") {
val df = spark.readStream
.format("rate")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org