You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/05/09 18:32:20 UTC

spark git commit: [SPARK-24214][SS] Fix toJSON for StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation

Repository: spark
Updated Branches:
  refs/heads/master 7aaa148f5 -> fd1179c17


[SPARK-24214][SS] Fix toJSON for StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation

## What changes were proposed in this pull request?

We should overwrite "otherCopyArgs" to provide the SparkSession parameter otherwise TreeNode.toJSON cannot get the full constructor parameter list.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <zs...@gmail.com>

Closes #21275 from zsxwing/SPARK-24214.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd1179c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd1179c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd1179c1

Branch: refs/heads/master
Commit: fd1179c17273283d32f275d5cd5f97aaa2aca1f7
Parents: 7aaa148
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed May 9 11:32:17 2018 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed May 9 11:32:17 2018 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/StreamingRelation.scala  |  3 +++
 .../spark/sql/streaming/StreamingQuerySuite.scala    | 15 +++++++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fd1179c1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index f02d3a2..24195b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -66,6 +66,7 @@ case class StreamingExecutionRelation(
     output: Seq[Attribute])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = source.toString
 
@@ -97,6 +98,7 @@ case class StreamingRelationV2(
     output: Seq[Attribute],
     v1Relation: Option[StreamingRelation])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = sourceName
 
@@ -116,6 +118,7 @@ case class ContinuousExecutionRelation(
     output: Seq[Attribute])(session: SparkSession)
   extends LeafNode with MultiInstanceRelation {
 
+  override def otherCopyArgs: Seq[AnyRef] = session :: Nil
   override def isStreaming: Boolean = true
   override def toString: String = source.toString
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fd1179c1/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 0cb2375..5798699 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -831,6 +831,21 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
       CheckLastBatch(("A", 1)))
   }
 
+  test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON " +
+    "should not fail") {
+    val df = spark.readStream.format("rate").load()
+    assert(df.logicalPlan.toJSON.contains("StreamingRelationV2"))
+
+    testStream(df)(
+      AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingExecutionRelation"))
+    )
+
+    testStream(df, useV2Sink = true)(
+      StartStream(trigger = Trigger.Continuous(100)),
+      AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
+    )
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns the given static DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
     require(!triggerDF.isStreaming)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org