You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2019/11/13 17:00:16 UTC

[spark] branch master updated: [SPARK-29568][SS] Stop existing running streams when a new stream is launched

This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 363af16  [SPARK-29568][SS] Stop existing running streams when a new stream is launched
363af16 is described below

commit 363af16c72abe19fc5cc5b5bdf9d8dc34975f2ba
Author: Burak Yavuz <br...@gmail.com>
AuthorDate: Wed Nov 13 08:59:46 2019 -0800

    [SPARK-29568][SS] Stop existing running streams when a new stream is launched
    
    ### What changes were proposed in this pull request?
    
    This PR adds a SQL Conf: `spark.sql.streaming.stopActiveRunOnRestart`. When this conf is `true` (by default it is), an already running stream will be stopped, if a new copy gets launched on the same checkpoint location.
    
    ### Why are the changes needed?
    
    In multi-tenant environments where you have multiple SparkSessions, you can accidentally start multiple copies of the same stream (i.e. streams using the same checkpoint location). This will cause all new instantiations of the new stream to fail. However, sometimes you may want to turn off the old stream, as the old stream may have turned into a zombie (you no longer have access to the query handle or SparkSession).
    
    It would be nice to have a SQL flag that allows the stopping of the old stream for such zombie cases.
    
    ### Does this PR introduce any user-facing change?
    
    Yes. Now by default, if you launch a new copy of an already running stream on a multi-tenant cluster, the existing stream will be stopped.
    
    ### How was this patch tested?
    
    Unit tests in StreamingQueryManagerSuite
    
    Closes #26225 from brkyvz/stopStream.
    
    Lead-authored-by: Burak Yavuz <br...@gmail.com>
    Co-authored-by: Burak Yavuz <bu...@databricks.com>
    Signed-off-by: Burak Yavuz <br...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |   9 ++
 .../apache/spark/sql/internal/SharedState.scala    |  10 +-
 .../sql/streaming/StreamingQueryManager.scala      |  82 +++++++++----
 .../sql/streaming/StreamingQueryManagerSuite.scala | 134 ++++++++++++++++-----
 .../spark/sql/streaming/StreamingQuerySuite.scala  |   8 +-
 5 files changed, 184 insertions(+), 59 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 98acace..759586a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1093,6 +1093,15 @@ object SQLConf {
       .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
       .createWithDefault(2)
 
+  val STREAMING_STOP_ACTIVE_RUN_ON_RESTART =
+    buildConf("spark.sql.streaming.stopActiveRunOnRestart")
+    .doc("Running multiple runs of the same streaming query concurrently is not supported. " +
+      "If we find a concurrent active run for a streaming query (in the same or different " +
+      "SparkSessions on the same cluster) and this flag is true, we will stop the old streaming " +
+      "query run to start the new one.")
+    .booleanConf
+    .createWithDefault(true)
+
   val STREAMING_JOIN_STATE_FORMAT_VERSION =
     buildConf("spark.sql.streaming.join.stateFormatVersion")
       .internal()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index d097f9f..b810bed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
 import java.net.URL
 import java.util.{Locale, UUID}
 import java.util.concurrent.ConcurrentHashMap
+import javax.annotation.concurrent.GuardedBy
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -32,9 +33,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.CacheManager
+import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
 import org.apache.spark.sql.internal.StaticSQLConf._
-import org.apache.spark.sql.streaming.StreamingQueryManager
+import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.util.Utils
 
@@ -112,11 +114,15 @@ private[sql] class SharedState(
    */
   val cacheManager: CacheManager = new CacheManager
 
+  /** A global lock for all streaming query lifecycle tracking and management. */
+  private[sql] val activeQueriesLock = new Object
+
   /**
    * A map of active streaming queries to the session specific StreamingQueryManager that manages
    * the lifecycle of that stream.
    */
-  private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]()
+  @GuardedBy("activeQueriesLock")
+  private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamExecution]()
 
   /**
    * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 9b43a83..e64f67c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import java.util.UUID
+import java.util.{ConcurrentModificationException, UUID}
 import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.util.{Clock, SystemClock, Utils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 /**
  * A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
@@ -51,9 +51,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
   private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
 
-  @GuardedBy("activeQueriesLock")
+  @GuardedBy("activeQueriesSharedLock")
   private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
-  private val activeQueriesLock = new Object
+  // A global lock to keep track of active streaming queries across Spark sessions
+  private val activeQueriesSharedLock = sparkSession.sharedState.activeQueriesLock
   private val awaitTerminationLock = new Object
 
   @GuardedBy("awaitTerminationLock")
@@ -77,7 +78,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
    *
    * @since 2.0.0
    */
-  def active: Array[StreamingQuery] = activeQueriesLock.synchronized {
+  def active: Array[StreamingQuery] = activeQueriesSharedLock.synchronized {
     activeQueries.values.toArray
   }
 
@@ -86,7 +87,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
    *
    * @since 2.1.0
    */
-  def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized {
+  def get(id: UUID): StreamingQuery = activeQueriesSharedLock.synchronized {
     activeQueries.get(id).orNull
   }
 
@@ -343,27 +344,61 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
       trigger,
       triggerClock)
 
-    activeQueriesLock.synchronized {
+    // The following code block checks if a stream with the same name or id is running. Then it
+    // returns an Option of an already active stream to stop outside of the lock
+    // to avoid a deadlock.
+    val activeRunOpt = activeQueriesSharedLock.synchronized {
       // Make sure no other query with same name is active
       userSpecifiedName.foreach { name =>
         if (activeQueries.values.exists(_.name == name)) {
-          throw new IllegalArgumentException(
-            s"Cannot start query with name $name as a query with that name is already active")
+          throw new IllegalArgumentException(s"Cannot start query with name $name as a query " +
+            s"with that name is already active in this SparkSession")
         }
       }
 
       // Make sure no other query with same id is active across all sessions
-      val activeOption =
-        Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this))
-      if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) {
-        throw new IllegalStateException(
-          s"Cannot start query with id ${query.id} as another query with same id is " +
-            s"already active. Perhaps you are attempting to restart a query from checkpoint " +
-            s"that is already active.")
+      val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id))
+        .orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ...
+
+      val shouldStopActiveRun =
+        sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
+      if (activeOption.isDefined) {
+        if (shouldStopActiveRun) {
+          val oldQuery = activeOption.get
+          logWarning(s"Stopping existing streaming query [id=${query.id}, " +
+            s"runId=${oldQuery.runId}], as a new run is being started.")
+          Some(oldQuery)
+        } else {
+          throw new IllegalStateException(
+            s"Cannot start query with id ${query.id} as another query with same id is " +
+              s"already active. Perhaps you are attempting to restart a query from checkpoint " +
+              s"that is already active. You may stop the old query by setting the SQL " +
+              "configuration: " +
+              s"""spark.conf.set("${SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key}", true) """ +
+              "and retry.")
+        }
+      } else {
+        // nothing to stop so, no-op
+        None
       }
+    }
 
+    // stop() will clear the queryId from activeStreamingQueries as well as activeQueries
+    activeRunOpt.foreach(_.stop())
+
+    activeQueriesSharedLock.synchronized {
+      // We still can have a race condition when two concurrent instances try to start the same
+      // stream, while a third one was already active and stopped above. In this case, we throw a
+      // ConcurrentModificationException.
+      val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(
+        query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper
+      if (oldActiveQuery != null) {
+        throw new ConcurrentModificationException(
+          "Another instance of this query was just started by a concurrent session.")
+      }
       activeQueries.put(query.id, query)
     }
+
     try {
       // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
       // As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
@@ -372,7 +407,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
       query.streamingQuery.start()
     } catch {
       case e: Throwable =>
-        unregisterTerminatedStream(query.id)
+        unregisterTerminatedStream(query)
         throw e
     }
     query
@@ -380,7 +415,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
 
   /** Notify (by the StreamingQuery) that the query has been terminated */
   private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = {
-    unregisterTerminatedStream(terminatedQuery.id)
+    unregisterTerminatedStream(terminatedQuery)
     awaitTerminationLock.synchronized {
       if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
         lastTerminatedQuery = terminatedQuery
@@ -390,11 +425,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
     stateStoreCoordinator.deactivateInstances(terminatedQuery.runId)
   }
 
-  private def unregisterTerminatedStream(terminatedQueryId: UUID): Unit = {
-    activeQueriesLock.synchronized {
-      // remove from shared state only if the streaming query manager also matches
-      sparkSession.sharedState.activeStreamingQueries.remove(terminatedQueryId, this)
-      activeQueries -= terminatedQueryId
+  private def unregisterTerminatedStream(terminatedQuery: StreamingQuery): Unit = {
+    activeQueriesSharedLock.synchronized {
+      // remove from shared state only if the streaming execution also matches
+      sparkSession.sharedState.activeStreamingQueries.remove(
+        terminatedQuery.id, terminatedQuery)
+      activeQueries -= terminatedQuery.id
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 09580b9..96f7efe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.{Dataset, Encoders}
 import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.util.BlockingSource
 import org.apache.spark.util.Utils
 
@@ -274,48 +275,119 @@ class StreamingQueryManagerSuite extends StreamTest {
   }
 
   testQuietly("can't start multiple instances of the same streaming query in the same session") {
-    withTempDir { dir =>
-      val (ms1, ds1) = makeDataset
-      val (ms2, ds2) = makeDataset
-      val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
-      val dataLocation = new File(dir, "data").getCanonicalPath
-
-      val query1 = ds1.writeStream.format("parquet")
-        .option("checkpointLocation", chkLocation).start(dataLocation)
-      ms1.addData(1, 2, 3)
-      try {
-        val e = intercept[IllegalStateException] {
-          ds2.writeStream.format("parquet")
+    withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
+      withTempDir { dir =>
+        val (ms1, ds1) = makeDataset
+        val (ms2, ds2) = makeDataset
+        val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+        val dataLocation = new File(dir, "data").getCanonicalPath
+
+        val query1 = ds1.writeStream.format("parquet")
+          .option("checkpointLocation", chkLocation).start(dataLocation)
+        ms1.addData(1, 2, 3)
+        try {
+          val e = intercept[IllegalStateException] {
+            ds2.writeStream.format("parquet")
+              .option("checkpointLocation", chkLocation).start(dataLocation)
+          }
+          assert(e.getMessage.contains("same id"))
+        } finally {
+          spark.streams.active.foreach(_.stop())
+        }
+      }
+    }
+  }
+
+  testQuietly("new instance of the same streaming query stops old query in the same session") {
+    failAfter(90 seconds) {
+      withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
+        withTempDir { dir =>
+          val (ms1, ds1) = makeDataset
+          val (ms2, ds2) = makeDataset
+          val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+          val dataLocation = new File(dir, "data").getCanonicalPath
+
+          val query1 = ds1.writeStream.format("parquet")
+            .option("checkpointLocation", chkLocation).start(dataLocation)
+          ms1.addData(1, 2, 3)
+          val query2 = ds2.writeStream.format("parquet")
             .option("checkpointLocation", chkLocation).start(dataLocation)
+          try {
+            ms2.addData(1, 2, 3)
+            query2.processAllAvailable()
+            assert(spark.sharedState.activeStreamingQueries.get(query2.id) ===
+              query2.asInstanceOf[StreamingQueryWrapper].streamingQuery,
+              "The correct streaming query is not being tracked in global state")
+
+            assert(!query1.isActive,
+              "First query should have stopped before starting the second query")
+          } finally {
+            spark.streams.active.foreach(_.stop())
+          }
         }
-        assert(e.getMessage.contains("same id"))
-      } finally {
-        query1.stop()
       }
     }
   }
 
   testQuietly(
     "can't start multiple instances of the same streaming query in the different sessions") {
-    withTempDir { dir =>
-      val session2 = spark.cloneSession()
-
-      val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
-      val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
-      val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
-      val dataLocation = new File(dir, "data").getCanonicalPath
+    withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
+      withTempDir { dir =>
+        val session2 = spark.cloneSession()
+
+        val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
+        val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+        val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+        val dataLocation = new File(dir, "data").getCanonicalPath
+
+        val query1 = ms1.toDS().writeStream.format("parquet")
+          .option("checkpointLocation", chkLocation).start(dataLocation)
+        ms1.addData(1, 2, 3)
+        try {
+          val e = intercept[IllegalStateException] {
+            ds2.writeStream.format("parquet")
+              .option("checkpointLocation", chkLocation).start(dataLocation)
+          }
+          assert(e.getMessage.contains("same id"))
+        } finally {
+          spark.streams.active.foreach(_.stop())
+          session2.streams.active.foreach(_.stop())
+        }
+      }
+    }
+  }
 
-      val query1 = ms1.toDS().writeStream.format("parquet")
-        .option("checkpointLocation", chkLocation).start(dataLocation)
-      ms1.addData(1, 2, 3)
-      try {
-        val e = intercept[IllegalStateException] {
-          ds2.writeStream.format("parquet")
+  testQuietly(
+    "new instance of the same streaming query stops old query in a different session") {
+    failAfter(90 seconds) {
+      withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
+        withTempDir { dir =>
+          val session2 = spark.cloneSession()
+
+          val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
+          val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+          val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
+          val dataLocation = new File(dir, "data").getCanonicalPath
+
+          val query1 = ms1.toDS().writeStream.format("parquet")
+            .option("checkpointLocation", chkLocation).start(dataLocation)
+          ms1.addData(1, 2, 3)
+          val query2 = ds2.writeStream.format("parquet")
             .option("checkpointLocation", chkLocation).start(dataLocation)
+          try {
+            ms1.addData(1, 2, 3)
+            query2.processAllAvailable()
+            assert(spark.sharedState.activeStreamingQueries.get(query2.id) ===
+              query2.asInstanceOf[StreamingQueryWrapper].streamingQuery,
+              "The correct streaming execution is not being tracked in global state")
+
+            assert(!query1.isActive,
+              "First query should have stopped before starting the second query")
+          } finally {
+            spark.streams.active.foreach(_.stop())
+            session2.streams.active.foreach(_.stop())
+          }
         }
-        assert(e.getMessage.contains("same id"))
-      } finally {
-        query1.stop()
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 760731d..4121f49 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -123,9 +123,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
       assert(q3.runId !== q4.runId)
 
       // Only one query with same id can be active
-      val q5 = startQuery(restart = false)
-      val e = intercept[IllegalStateException] {
-        startQuery(restart = true)
+      withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
+        val q5 = startQuery(restart = false)
+        val e = intercept[IllegalStateException] {
+          startQuery(restart = true)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org