You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/13 06:31:28 UTC

spark git commit: [SPARK-18796][SS] StreamingQueryManager should not block when starting a query

Repository: spark
Updated Branches:
  refs/heads/master bc59951ba -> 417e45c58


[SPARK-18796][SS] StreamingQueryManager should not block when starting a query

## What changes were proposed in this pull request?

Major change in this PR:
- Add `pendingQueryNames` and `pendingQueryIds` to track that are going to start but not yet put into `activeQueries` so that we don't need to hold a lock when starting a query.

Minor changes:
- Fix a potential NPE when the user sets `checkpointLocation` using SQLConf but doesn't specify a query name.
- Add missing docs in `StreamingQueryListener`

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16220 from zsxwing/SPARK-18796.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/417e45c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/417e45c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/417e45c5

Branch: refs/heads/master
Commit: 417e45c58484a6b984ad2ce9ba8f47aa0a9983fd
Parents: bc59951
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Dec 12 22:31:22 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 12 22:31:22 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   |   5 +-
 .../sql/streaming/StreamingQueryListener.scala  |   7 +-
 .../sql/streaming/StreamingQueryManager.scala   | 148 ++++++++++++-------
 .../test/DataStreamReaderWriterSuite.scala      |  56 +++++++
 4 files changed, 158 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/417e45c5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 48eee42..9fe6819 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -223,7 +223,8 @@ class StreamExecution(
         sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
 
-      postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception.
+      // `postEvent` does not throw non fatal exception.
+      postEvent(new QueryStartedEvent(id, runId, name))
 
       // Unblock starting thread
       startLatch.countDown()
@@ -286,7 +287,7 @@ class StreamExecution(
           e,
           committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
           availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
-        logError(s"Query $name terminated with error", e)
+        logError(s"Query $prettyIdString terminated with error", e)
         updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
         // handle them

http://git-wip-us.apache.org/repos/asf/spark/blob/417e45c5/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 6fc859d..8177332 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -83,6 +83,9 @@ object StreamingQueryListener {
   /**
    * :: Experimental ::
    * Event representing the start of a query
+   * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
+   * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+   * @param name User-specified name of the query, null if not specified.
    * @since 2.1.0
    */
   @Experimental
@@ -94,6 +97,7 @@ object StreamingQueryListener {
   /**
    * :: Experimental ::
    * Event representing any progress updates in a query.
+   * @param progress The query progress updates.
    * @since 2.1.0
    */
   @Experimental
@@ -103,7 +107,8 @@ object StreamingQueryListener {
    * :: Experimental ::
    * Event representing that termination of a query.
    *
-   * @param id The query id.
+   * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
+   * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
    * @param exception The exception message of the query if the query was terminated
    *                  with an exception. Otherwise, it will be `None`.
    * @since 2.1.0

http://git-wip-us.apache.org/repos/asf/spark/blob/417e45c5/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 52d0791..6ebd706 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.util.UUID
-import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
 
@@ -44,10 +44,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
   private[sql] val stateStoreCoordinator =
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
   private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
+
+  @GuardedBy("activeQueriesLock")
   private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
   private val activeQueriesLock = new Object
   private val awaitTerminationLock = new Object
 
+  @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
   /**
@@ -181,8 +184,65 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
     listenerBus.post(event)
   }
 
+  private def createQuery(
+      userSpecifiedName: Option[String],
+      userSpecifiedCheckpointLocation: Option[String],
+      df: DataFrame,
+      sink: Sink,
+      outputMode: OutputMode,
+      useTempCheckpointLocation: Boolean,
+      recoverFromCheckpointLocation: Boolean,
+      trigger: Trigger,
+      triggerClock: Clock): StreamExecution = {
+    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
+      new Path(userSpecified).toUri.toString
+    }.orElse {
+      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
+        new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
+      }
+    }.getOrElse {
+      if (useTempCheckpointLocation) {
+        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+      } else {
+        throw new AnalysisException(
+          "checkpointLocation must be specified either " +
+            """through option("checkpointLocation", ...) or """ +
+            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
+      }
+    }
+
+    // If offsets have already been created, we trying to resume a query.
+    if (!recoverFromCheckpointLocation) {
+      val checkpointPath = new Path(checkpointLocation, "offsets")
+      val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
+      if (fs.exists(checkpointPath)) {
+        throw new AnalysisException(
+          s"This query does not support recovering from checkpoint location. " +
+            s"Delete $checkpointPath to start over.")
+      }
+    }
+
+    val analyzedPlan = df.queryExecution.analyzed
+    df.queryExecution.assertAnalyzed()
+
+    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
+      UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+    }
+
+    new StreamExecution(
+      sparkSession,
+      userSpecifiedName.orNull,
+      checkpointLocation,
+      analyzedPlan,
+      sink,
+      trigger,
+      triggerClock,
+      outputMode)
+  }
+
   /**
    * Start a [[StreamingQuery]].
+   *
    * @param userSpecifiedName Query name optionally specified by the user.
    * @param userSpecifiedCheckpointLocation  Checkpoint location optionally specified by the user.
    * @param df Streaming DataFrame.
@@ -206,72 +266,50 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       recoverFromCheckpointLocation: Boolean = true,
       trigger: Trigger = ProcessingTime(0),
       triggerClock: Clock = new SystemClock()): StreamingQuery = {
-    activeQueriesLock.synchronized {
-      val name = userSpecifiedName match {
-        case Some(n) =>
-          if (activeQueries.values.exists(_.name == userSpecifiedName.get)) {
-            throw new IllegalArgumentException(
-              s"Cannot start query with name $n as a query with that name is already active")
-          }
-          n
-        case None => null
-      }
-      val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
-        new Path(userSpecified).toUri.toString
-      }.orElse {
-        df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
-          new Path(location, name).toUri.toString
-        }
-      }.getOrElse {
-        if (useTempCheckpointLocation) {
-          Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
-        } else {
-          throw new AnalysisException(
-            "checkpointLocation must be specified either " +
-              """through option("checkpointLocation", ...) or """ +
-              s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
-        }
-      }
+    val query = createQuery(
+      userSpecifiedName,
+      userSpecifiedCheckpointLocation,
+      df,
+      sink,
+      outputMode,
+      useTempCheckpointLocation,
+      recoverFromCheckpointLocation,
+      trigger,
+      triggerClock)
 
-      // If offsets have already been created, we trying to resume a query.
-      if (!recoverFromCheckpointLocation) {
-        val checkpointPath = new Path(checkpointLocation, "offsets")
-        val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
-        if (fs.exists(checkpointPath)) {
-          throw new AnalysisException(
-            s"This query does not support recovering from checkpoint location. " +
-              s"Delete $checkpointPath to start over.")
+    activeQueriesLock.synchronized {
+      // Make sure no other query with same name is active
+      userSpecifiedName.foreach { name =>
+        if (activeQueries.values.exists(_.name == name)) {
+          throw new IllegalArgumentException(
+            s"Cannot start query with name $name as a query with that name is already active")
         }
       }
 
-      val analyzedPlan = df.queryExecution.analyzed
-      df.queryExecution.assertAnalyzed()
-
-      if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
-        UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
-      }
-
-      val query = new StreamExecution(
-        sparkSession,
-        name,
-        checkpointLocation,
-        analyzedPlan,
-        sink,
-        trigger,
-        triggerClock,
-        outputMode)
-
+      // Make sure no other query with same id is active
       if (activeQueries.values.exists(_.id == query.id)) {
         throw new IllegalStateException(
           s"Cannot start query with id ${query.id} as another query with same id is " +
-            s"already active. Perhaps you are attempting to restart a query from checkpoint" +
+            s"already active. Perhaps you are attempting to restart a query from checkpoint " +
             s"that is already active.")
       }
 
-      query.start()
       activeQueries.put(query.id, query)
-      query
     }
+    try {
+      // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
+      // As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
+      // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long
+      // time to finish.
+      query.start()
+    } catch {
+      case e: Throwable =>
+        activeQueriesLock.synchronized {
+          activeQueries -= query.id
+        }
+        throw e
+    }
+    query
   }
 
   /** Notify (by the StreamingQuery) that the query has been terminated */

http://git-wip-us.apache.org/repos/asf/spark/blob/417e45c5/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 0eb95a0..f4a6290 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
 import org.apache.spark.sql.types._
@@ -575,4 +576,59 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
       sq.stop()
     }
   }
+
+  test("user specified checkpointLocation precedes SQLConf") {
+    import testImplicits._
+    withTempDir { checkpointPath =>
+      withTempPath { userCheckpointPath =>
+        assert(!userCheckpointPath.exists(), s"$userCheckpointPath should not exist")
+        withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
+          val queryName = "test_query"
+          val ds = MemoryStream[Int].toDS
+          ds.writeStream
+            .format("memory")
+            .queryName(queryName)
+            .option("checkpointLocation", userCheckpointPath.getAbsolutePath)
+            .start()
+            .stop()
+          assert(checkpointPath.listFiles().isEmpty,
+            "SQLConf path is used even if user specified checkpointLoc: " +
+              s"${checkpointPath.listFiles()} is not empty")
+          assert(userCheckpointPath.exists(),
+            s"The user specified checkpointLoc (userCheckpointPath) is not created")
+        }
+      }
+    }
+  }
+
+  test("use SQLConf checkpoint dir when checkpointLocation is not specified") {
+    import testImplicits._
+    withTempDir { checkpointPath =>
+      withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
+        val queryName = "test_query"
+        val ds = MemoryStream[Int].toDS
+        ds.writeStream.format("memory").queryName(queryName).start().stop()
+        // Should use query name to create a folder in `checkpointPath`
+        val queryCheckpointDir = new File(checkpointPath, queryName)
+        assert(queryCheckpointDir.exists(), s"$queryCheckpointDir doesn't exist")
+        assert(
+          checkpointPath.listFiles().size === 1,
+          s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ")
+      }
+    }
+  }
+
+  test("use SQLConf checkpoint dir when checkpointLocation is not specified without query name") {
+    import testImplicits._
+    withTempDir { checkpointPath =>
+      withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
+        val ds = MemoryStream[Int].toDS
+        ds.writeStream.format("console").start().stop()
+        // Should create a random folder in `checkpointPath`
+        assert(
+          checkpointPath.listFiles().size === 1,
+          s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ")
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org