You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/05/09 18:32:20 UTC
spark git commit: [SPARK-24214][SS] Fix toJSON for
StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation
Repository: spark
Updated Branches:
refs/heads/master 7aaa148f5 -> fd1179c17
[SPARK-24214][SS] Fix toJSON for StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation
## What changes were proposed in this pull request?
We should overwrite "otherCopyArgs" to provide the SparkSession parameter otherwise TreeNode.toJSON cannot get the full constructor parameter list.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <zs...@gmail.com>
Closes #21275 from zsxwing/SPARK-24214.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd1179c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd1179c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd1179c1
Branch: refs/heads/master
Commit: fd1179c17273283d32f275d5cd5f97aaa2aca1f7
Parents: 7aaa148
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed May 9 11:32:17 2018 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed May 9 11:32:17 2018 -0700
----------------------------------------------------------------------
.../sql/execution/streaming/StreamingRelation.scala | 3 +++
.../spark/sql/streaming/StreamingQuerySuite.scala | 15 +++++++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fd1179c1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index f02d3a2..24195b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -66,6 +66,7 @@ case class StreamingExecutionRelation(
output: Seq[Attribute])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {
+ override def otherCopyArgs: Seq[AnyRef] = session :: Nil
override def isStreaming: Boolean = true
override def toString: String = source.toString
@@ -97,6 +98,7 @@ case class StreamingRelationV2(
output: Seq[Attribute],
v1Relation: Option[StreamingRelation])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {
+ override def otherCopyArgs: Seq[AnyRef] = session :: Nil
override def isStreaming: Boolean = true
override def toString: String = sourceName
@@ -116,6 +118,7 @@ case class ContinuousExecutionRelation(
output: Seq[Attribute])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {
+ override def otherCopyArgs: Seq[AnyRef] = session :: Nil
override def isStreaming: Boolean = true
override def toString: String = source.toString
http://git-wip-us.apache.org/repos/asf/spark/blob/fd1179c1/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 0cb2375..5798699 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -831,6 +831,21 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
CheckLastBatch(("A", 1)))
}
+ test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON " +
+ "should not fail") {
+ val df = spark.readStream.format("rate").load()
+ assert(df.logicalPlan.toJSON.contains("StreamingRelationV2"))
+
+ testStream(df)(
+ AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingExecutionRelation"))
+ )
+
+ testStream(df, useV2Sink = true)(
+ StartStream(trigger = Trigger.Continuous(100)),
+ AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
+ )
+ }
+
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
require(!triggerDF.isStreaming)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org