You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/10 06:49:59 UTC
spark git commit: [SPARK-18811] StreamSource resolution should happen
in stream execution thread
Repository: spark
Updated Branches:
refs/heads/master 3e11d5bfe -> 63c915987
[SPARK-18811] StreamSource resolution should happen in stream execution thread
## What changes were proposed in this pull request?
When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers.
## How was this patch tested?
Unit test added. Made sure test fails with no code changes.
Author: Burak Yavuz <br...@gmail.com>
Closes #16238 from brkyvz/SPARK-18811.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63c91598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63c91598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63c91598
Branch: refs/heads/master
Commit: 63c9159870ee274c68e24360594ca01d476b9ace
Parents: 3e11d5b
Author: Burak Yavuz <br...@gmail.com>
Authored: Fri Dec 9 22:49:51 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Dec 9 22:49:51 2016 -0800
----------------------------------------------------------------------
.../execution/streaming/StreamExecution.scala | 24 ++++++-
.../sql/streaming/StreamingQueryManager.scala | 14 +----
.../streaming/StreamingQueryManagerSuite.scala | 28 +++++++++
.../sql/streaming/util/DefaultSource.scala | 66 ++++++++++++++++++++
4 files changed, 116 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 39be222..b52810d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -47,7 +47,7 @@ class StreamExecution(
override val sparkSession: SparkSession,
override val name: String,
checkpointRoot: String,
- val logicalPlan: LogicalPlan,
+ analyzedPlan: LogicalPlan,
val sink: Sink,
val trigger: Trigger,
val triggerClock: Clock,
@@ -115,12 +115,26 @@ class StreamExecution(
private val prettyIdString =
Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
+ override lazy val logicalPlan: LogicalPlan = {
+ var nextSourceId = 0L
+ analyzedPlan.transform {
+ case StreamingRelation(dataSource, _, output) =>
+ // Materialize source to avoid creating it in every batch
+ val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+ val source = dataSource.createSource(metadataPath)
+ nextSourceId += 1
+ // We still need to use the previous `output` instead of `source.schema` as attributes in
+ // "df.logicalPlan" has already used attributes of the previous `output`.
+ StreamingExecutionRelation(source, output)
+ }
+ }
+
/** All stream sources present in the query plan. */
- protected val sources =
+ protected lazy val sources =
logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
/** A list of unique sources in the query plan. */
- private val uniqueSources = sources.distinct
+ private lazy val uniqueSources = sources.distinct
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -214,6 +228,10 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SparkSession.setActiveSession(sparkSession)
+ updateStatusMessage("Initializing sources")
+ // force initialization of the logical plan so that the sources can be created
+ logicalPlan
+
triggerExecutor.execute(() => {
startTrigger()
http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c6ab416..52d0791 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
- var nextSourceId = 0L
-
- val logicalPlan = analyzedPlan.transform {
- case StreamingRelation(dataSource, _, output) =>
- // Materialize source to avoid creating it in every batch
- val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
- val source = dataSource.createSource(metadataPath)
- nextSourceId += 1
- // We still need to use the previous `output` instead of `source.schema` as attributes in
- // "df.logicalPlan" has already used attributes of the previous `output`.
- StreamingExecutionRelation(source, output)
- }
val query = new StreamExecution(
sparkSession,
name,
checkpointLocation,
- logicalPlan,
+ analyzedPlan,
sink,
trigger,
triggerClock,
http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 268b8ff..d188319 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.streaming
+import java.util.concurrent.CountDownLatch
+
import scala.concurrent.Future
import scala.util.Random
import scala.util.control.NonFatal
@@ -213,6 +215,28 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("SPARK-18811: Source resolution should not block main thread") {
+ failAfter(streamingTimeout) {
+ StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+ withTempDir { tempDir =>
+ // if source resolution was happening on the main thread, it would block the start call,
+ // now it should only be blocking the stream execution thread
+ val sq = spark.readStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .load()
+ .writeStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .option("checkpointLocation", tempDir.toString)
+ .start()
+ eventually(Timeout(streamingTimeout)) {
+ assert(sq.status.message.contains("Initializing sources"))
+ }
+ StreamingQueryManagerSuite.latch.countDown()
+ sq.stop()
+ }
+ }
+ }
+
/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
@@ -297,3 +321,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
(inputData, mapped)
}
}
+
+object StreamingQueryManagerSuite {
+ var latch: CountDownLatch = null
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
new file mode 100644
index 0000000..b0adf76
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */
+class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
+
+ private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+ override def sourceSchema(
+ spark: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ ("dummySource", fakeSchema)
+ }
+
+ override def createSource(
+ spark: SQLContext,
+ metadataPath: String,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): Source = {
+ StreamingQueryManagerSuite.latch.await()
+ new Source {
+ override def schema: StructType = fakeSchema
+ override def getOffset: Option[Offset] = Some(new LongOffset(0))
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ import spark.implicits._
+ Seq[Int]().toDS().toDF()
+ }
+ override def stop() {}
+ }
+ }
+
+ override def createSink(
+ spark: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
+ new Sink {
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {}
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org