You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/07/14 21:37:29 UTC
spark git commit: [SPARK-21421][SS] Add the query id as a local
property to allow source and sink using it
Repository: spark
Updated Branches:
refs/heads/master 601a237b3 -> 2d968a07d
[SPARK-21421][SS] Add the query id as a local property to allow source and sink using it
## What changes were proposed in this pull request?
Add the query id as a local property to allow source and sink using it.
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #18638 from zsxwing/SPARK-21421.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d968a07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d968a07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d968a07
Branch: refs/heads/master
Commit: 2d968a07d211688a9c588deb859667dd8b653b27
Parents: 601a237
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Jul 14 14:37:27 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Jul 14 14:37:27 2017 -0700
----------------------------------------------------------------------
.../execution/streaming/StreamExecution.scala | 4 +++
.../sql/streaming/StreamingQuerySuite.scala | 27 ++++++++++++++++++++
2 files changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2d968a07/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 10c42a7..5ee596e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -263,6 +263,7 @@ class StreamExecution(
try {
sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
interruptOnCancel = true)
+ sparkSession.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, id.toString)
if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
@@ -842,6 +843,9 @@ class StreamExecution(
}
}
+object StreamExecution {
+ val QUERY_ID_KEY = "sql.streaming.queryId"
+}
/**
* A special thread to run the stream query. Some codes require to run in the StreamExecutionThread
http://git-wip-us.apache.org/repos/asf/spark/blob/2d968a07/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 0925646..41f73b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -613,6 +613,33 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
+ test("get the query id in source") {
+ @volatile var queryId: String = null
+ val source = new Source {
+ override def stop(): Unit = {}
+ override def getOffset: Option[Offset] = {
+ queryId = spark.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+ None
+ }
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = spark.emptyDataFrame
+ override def schema: StructType = MockSourceProvider.fakeSchema
+ }
+
+ MockSourceProvider.withMockSources(source) {
+ val df = spark.readStream
+ .format("org.apache.spark.sql.streaming.util.MockSourceProvider")
+ .load()
+ testStream(df)(
+ AssertOnQuery { sq =>
+ sq.processAllAvailable()
+ assert(sq.id.toString === queryId)
+ assert(sq.runId.toString !== queryId)
+ true
+ }
+ )
+ }
+ }
+
/** Create a streaming DF that only execute one batch in which it returns the given static DF */
private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = {
require(!triggerDF.isStreaming)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org