You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/07/14 21:37:29 UTC

spark git commit: [SPARK-21421][SS] Add the query id as a local property to allow source and sink using it

Repository: spark
Updated Branches:
  refs/heads/master 601a237b3 -> 2d968a07d


[SPARK-21421][SS] Add the query id as a local property to allow source and sink using it

## What changes were proposed in this pull request?

Add the query id as a local property to allow source and sink using it.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #18638 from zsxwing/SPARK-21421.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d968a07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d968a07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d968a07

Branch: refs/heads/master
Commit: 2d968a07d211688a9c588deb859667dd8b653b27
Parents: 601a237
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Jul 14 14:37:27 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Jul 14 14:37:27 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   |  4 +++
 .../sql/streaming/StreamingQuerySuite.scala     | 27 ++++++++++++++++++++
 2 files changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2d968a07/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 10c42a7..5ee596e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -263,6 +263,7 @@ class StreamExecution(
     try {
       sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
         interruptOnCancel = true)
+      sparkSession.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, id.toString)
       if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
         sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
@@ -842,6 +843,9 @@ class StreamExecution(
   }
 }
 
+object StreamExecution {
+  val QUERY_ID_KEY = "sql.streaming.queryId"
+}
 
 /**
  * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread

http://git-wip-us.apache.org/repos/asf/spark/blob/2d968a07/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 0925646..41f73b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -613,6 +613,33 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
+  test("get the query id in source") {
+    @volatile var queryId: String = null
+    val source = new Source {
+      override def stop(): Unit = {}
+      override def getOffset: Option[Offset] = {
+        queryId = spark.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+        None
+      }
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = spark.emptyDataFrame
+      override def schema: StructType = MockSourceProvider.fakeSchema
+    }
+
+    MockSourceProvider.withMockSources(source) {
+      val df = spark.readStream
+        .format("org.apache.spark.sql.streaming.util.MockSourceProvider")
+        .load()
+      testStream(df)(
+        AssertOnQuery { sq =>
+          sq.processAllAvailable()
+          assert(sq.id.toString === queryId)
+          assert(sq.runId.toString !== queryId)
+          true
+        }
+      )
+    }
+  }
+
   /** Create a streaming DF that only execute one batch in which it returns the given static DF */
   private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
     require(!triggerDF.isStreaming)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org