You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/10 06:49:59 UTC

spark git commit: [SPARK-18811] StreamSource resolution should happen in stream execution thread

Repository: spark
Updated Branches:
  refs/heads/master 3e11d5bfe -> 63c915987


[SPARK-18811] StreamSource resolution should happen in stream execution thread

## What changes were proposed in this pull request?

When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers.

## How was this patch tested?

Unit test added. Made sure test fails with no code changes.

Author: Burak Yavuz <br...@gmail.com>

Closes #16238 from brkyvz/SPARK-18811.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63c91598
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63c91598
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63c91598

Branch: refs/heads/master
Commit: 63c9159870ee274c68e24360594ca01d476b9ace
Parents: 3e11d5b
Author: Burak Yavuz <br...@gmail.com>
Authored: Fri Dec 9 22:49:51 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Dec 9 22:49:51 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 24 ++++++-
 .../sql/streaming/StreamingQueryManager.scala   | 14 +----
 .../streaming/StreamingQueryManagerSuite.scala  | 28 +++++++++
 .../sql/streaming/util/DefaultSource.scala      | 66 ++++++++++++++++++++
 4 files changed, 116 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 39be222..b52810d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -47,7 +47,7 @@ class StreamExecution(
     override val sparkSession: SparkSession,
     override val name: String,
     checkpointRoot: String,
-    val logicalPlan: LogicalPlan,
+    analyzedPlan: LogicalPlan,
     val sink: Sink,
     val trigger: Trigger,
     val triggerClock: Clock,
@@ -115,12 +115,26 @@ class StreamExecution(
   private val prettyIdString =
     Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
+  override lazy val logicalPlan: LogicalPlan = {
+    var nextSourceId = 0L
+    analyzedPlan.transform {
+      case StreamingRelation(dataSource, _, output) =>
+        // Materialize source to avoid creating it in every batch
+        val metadataPath = s"$checkpointRoot/sources/$nextSourceId"
+        val source = dataSource.createSource(metadataPath)
+        nextSourceId += 1
+        // We still need to use the previous `output` instead of `source.schema` as attributes in
+        // "df.logicalPlan" has already used attributes of the previous `output`.
+        StreamingExecutionRelation(source, output)
+    }
+  }
+
   /** All stream sources present in the query plan. */
-  protected val sources =
+  protected lazy val sources =
     logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
-  private val uniqueSources = sources.distinct
+  private lazy val uniqueSources = sources.distinct
 
   private val triggerExecutor = trigger match {
     case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
@@ -214,6 +228,10 @@ class StreamExecution(
       // While active, repeatedly attempt to run batches.
       SparkSession.setActiveSession(sparkSession)
 
+      updateStatusMessage("Initializing sources")
+      // force initialization of the logical plan so that the sources can be created
+      logicalPlan
+
       triggerExecutor.execute(() => {
         startTrigger()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c6ab416..52d0791 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -251,23 +251,11 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
         UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
       }
 
-      var nextSourceId = 0L
-
-      val logicalPlan = analyzedPlan.transform {
-        case StreamingRelation(dataSource, _, output) =>
-          // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
-          val source = dataSource.createSource(metadataPath)
-          nextSourceId += 1
-          // We still need to use the previous `output` instead of `source.schema` as attributes in
-          // "df.logicalPlan" has already used attributes of the previous `output`.
-          StreamingExecutionRelation(source, output)
-      }
       val query = new StreamExecution(
         sparkSession,
         name,
         checkpointLocation,
-        logicalPlan,
+        analyzedPlan,
         sink,
         trigger,
         triggerClock,

http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 268b8ff..d188319 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.concurrent.CountDownLatch
+
 import scala.concurrent.Future
 import scala.util.Random
 import scala.util.control.NonFatal
@@ -213,6 +215,28 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
+  test("SPARK-18811: Source resolution should not block main thread") {
+    failAfter(streamingTimeout) {
+      StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+      withTempDir { tempDir =>
+        // if source resolution was happening on the main thread, it would block the start call,
+        // now it should only be blocking the stream execution thread
+        val sq = spark.readStream
+          .format("org.apache.spark.sql.streaming.util.BlockingSource")
+          .load()
+          .writeStream
+          .format("org.apache.spark.sql.streaming.util.BlockingSource")
+          .option("checkpointLocation", tempDir.toString)
+          .start()
+        eventually(Timeout(streamingTimeout)) {
+          assert(sq.status.message.contains("Initializing sources"))
+        }
+        StreamingQueryManagerSuite.latch.countDown()
+        sq.stop()
+      }
+    }
+  }
+
 
   /** Run a body of code by defining a query on each dataset */
   private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
@@ -297,3 +321,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
     (inputData, mapped)
   }
 }
+
+object StreamingQueryManagerSuite {
+  var latch: CountDownLatch = null
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/63c91598/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
new file mode 100644
index 0000000..b0adf76
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */
+class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
+
+  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+  override def sourceSchema(
+      spark: SQLContext,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): (String, StructType) = {
+    ("dummySource", fakeSchema)
+  }
+
+  override def createSource(
+      spark: SQLContext,
+      metadataPath: String,
+      schema: Option[StructType],
+      providerName: String,
+      parameters: Map[String, String]): Source = {
+    StreamingQueryManagerSuite.latch.await()
+    new Source {
+      override def schema: StructType = fakeSchema
+      override def getOffset: Option[Offset] = Some(new LongOffset(0))
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        import spark.implicits._
+        Seq[Int]().toDS().toDF()
+      }
+      override def stop() {}
+    }
+  }
+
+  override def createSink(
+      spark: SQLContext,
+      parameters: Map[String, String],
+      partitionColumns: Seq[String],
+      outputMode: OutputMode): Sink = {
+    new Sink {
+      override def addBatch(batchId: Long, data: DataFrame): Unit = {}
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org