You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/06 02:17:42 UTC

spark git commit: [SPARK-18657][SPARK-18668] Make StreamingQuery.id persists across restart and not auto-generate StreamingQuery.name

Repository: spark
Updated Branches:
  refs/heads/master 1b2785c3d -> bb57bfe97


[SPARK-18657][SPARK-18668] Make StreamingQuery.id persists across restart and not auto-generate StreamingQuery.name

## What changes were proposed in this pull request?
Here are the major changes in this PR.
- Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`.
- Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`).
- Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default.
- Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`.

Implementation details
- Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`).
- Added the `id` as the new `StreamMetadata`.
- When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`.
- All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name`

TODO
- [x] Test handling of name=null in json generation of StreamingQueryProgress
- [x] Test handling of name=null in json generation of StreamingQueryListener events
- [x] Test python API of runId

## How was this patch tested?
Updated unit tests and new unit tests

Author: Tathagata Das <ta...@gmail.com>

Closes #16113 from tdas/SPARK-18657.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb57bfe9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb57bfe9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb57bfe9

Branch: refs/heads/master
Commit: bb57bfe97d9fb077885065b8e804b85d4c493faf
Parents: 1b2785c
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Dec 5 18:17:38 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 5 18:17:38 2016 -0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   3 +
 python/pyspark/sql/streaming.py                 |  19 +++-
 .../sql/execution/streaming/OffsetSeq.scala     |  27 ++++-
 .../sql/execution/streaming/OffsetSeqLog.scala  |   2 +-
 .../execution/streaming/ProgressReporter.scala  |   6 +-
 .../execution/streaming/StreamExecution.scala   | 105 +++++++++----------
 .../execution/streaming/StreamMetadata.scala    |  88 ++++++++++++++++
 .../execution/streaming/StreamProgress.scala    |   2 +-
 .../spark/sql/streaming/StreamingQuery.scala    |  19 +++-
 .../sql/streaming/StreamingQueryListener.scala  |  10 +-
 .../sql/streaming/StreamingQueryManager.scala   |  25 +++--
 .../apache/spark/sql/streaming/progress.scala   |   7 +-
 .../query-metadata-logs-version-2.1.0.txt       |   3 +
 .../execution/streaming/OffsetSeqLogSuite.scala |  13 ++-
 .../streaming/StreamMetadataSuite.scala         |  55 ++++++++++
 .../StreamExecutionMetadataSuite.scala          |  35 -------
 .../streaming/StreamingQueryListenerSuite.scala |  46 +++++---
 .../StreamingQueryStatusAndProgressSuite.scala  |  78 ++++++++++++--
 .../sql/streaming/StreamingQuerySuite.scala     | 100 ++++++++++++------
 19 files changed, 466 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f3e5a21..82d50f9 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -108,6 +108,9 @@ object MimaExcludes {
       // [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
       ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),
 
+      // [SPARK-18657] Add StreamingQuery.runId
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId"),
+
       // [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
       ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
       ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 4a7d17b..ee7a26d 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -51,14 +51,29 @@ class StreamingQuery(object):
     @property
     @since(2.0)
     def id(self):
-        """The id of the streaming query.
+        """Returns the unique id of this query that persists across restarts from checkpoint data.
+        That is, this id is generated when a query is started for the first time, and
+        will be the same every time it is restarted from checkpoint data.
+        There can only be one query with the same id active in a Spark cluster.
+        Also see, `runId`.
         """
         return self._jsq.id().toString()
 
     @property
+    @since(2.1)
+    def runId(self):
+        """Returns the unique id of this query that does not persist across restarts. That is, every
+        query that is started (or restarted from checkpoint) will have a different runId.
+        """
+        return self._jsq.runId().toString()
+
+    @property
     @since(2.0)
     def name(self):
-        """The name of the streaming query. This name is unique across all active queries.
+        """Returns the user-specified name of the query, or null if not specified.
+        This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
+        as `dataframe.writeStream.queryName("query").start()`.
+        This name, if set, must be unique across all active queries.
         """
         return self._jsq.name()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 7469cae..e5a1997 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
 
 /**
  * An ordered collection of offsets, used to track the progress of processing data from one or more
  * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
  * vector clock that must progress linearly forward.
  */
-case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) {
+case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {
 
   /**
    * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
@@ -54,6 +57,26 @@ object OffsetSeq {
    * `nulls` in the sequence are converted to `None`s.
    */
   def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
-    OffsetSeq(offsets.map(Option(_)), metadata)
+    OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
   }
 }
+
+
+/**
+ * Contains metadata associated with a [[OffsetSeq]]. This information is
+ * persisted to the offset log in the checkpoint location via the [[OffsetSeq]] metadata field.
+ *
+ * @param batchWatermarkMs: The current eventTime watermark, used to
+ * bound the lateness of data that will processed. Time unit: milliseconds
+ * @param batchTimestampMs: The current batch processing timestamp.
+ * Time unit: milliseconds
+ */
+case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
+  def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
+}
+
+object OffsetSeqMetadata {
+  private implicit val format = Serialization.formats(NoTypeHints)
+  def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index cc25b44..3210d8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -74,7 +74,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
 
     // write metadata
     out.write('\n')
-    out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8))
+    out.write(offsetSeq.metadata.map(_.json).getOrElse("").getBytes(UTF_8))
 
     // write offsets, one per line
     offsetSeq.offsets.map(_.map(_.json)).foreach { offset =>

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index ba77e7c..7d0d086 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -43,6 +43,7 @@ trait ProgressReporter extends Logging {
 
   // Internal state of the stream, required for computing metrics.
   protected def id: UUID
+  protected def runId: UUID
   protected def name: String
   protected def triggerClock: Clock
   protected def logicalPlan: LogicalPlan
@@ -52,7 +53,7 @@ trait ProgressReporter extends Logging {
   protected def committedOffsets: StreamProgress
   protected def sources: Seq[Source]
   protected def sink: Sink
-  protected def streamExecutionMetadata: StreamExecutionMetadata
+  protected def offsetSeqMetadata: OffsetSeqMetadata
   protected def currentBatchId: Long
   protected def sparkSession: SparkSession
 
@@ -134,11 +135,12 @@ trait ProgressReporter extends Logging {
 
     val newProgress = new StreamingQueryProgress(
       id = id,
+      runId = runId,
       name = name,
       timestamp = currentTriggerStartTimestamp,
       batchId = currentBatchId,
       durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
-      currentWatermark = streamExecutionMetadata.batchWatermarkMs,
+      currentWatermark = offsetSeqMetadata.batchWatermarkMs,
       stateOperators = executionStats.stateOperators.toArray,
       sources = sourceProgress.toArray,
       sink = sinkProgress)

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6b1c01a..083cce8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -25,8 +25,6 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
@@ -58,9 +56,6 @@ class StreamExecution(
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
 
-  // TODO: restore this from the checkpoint directory.
-  override val id: UUID = UUID.randomUUID()
-
   private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
 
   private val noDataProgressEventInterval =
@@ -98,8 +93,30 @@ class StreamExecution(
   /** The current batchId or -1 if execution has not yet been initialized. */
   protected var currentBatchId: Long = -1
 
-  /** Stream execution metadata */
-  protected var streamExecutionMetadata = StreamExecutionMetadata()
+  /** Metadata associated with the whole query */
+  protected val streamMetadata: StreamMetadata = {
+    val metadataPath = new Path(checkpointFile("metadata"))
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
+      val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
+      StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
+      newMetadata
+    }
+  }
+
+  /** Metadata associated with the offset seq of a batch in the query. */
+  protected var offsetSeqMetadata = OffsetSeqMetadata()
+
+  override val id: UUID = UUID.fromString(streamMetadata.id)
+
+  override val runId: UUID = UUID.randomUUID
+
+  /**
+   * Pretty identified string of printing in logs. Format is
+   * If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]"
+   */
+  private val prettyIdString =
+    Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
 
   /** All stream sources present in the query plan. */
   protected val sources =
@@ -128,8 +145,9 @@ class StreamExecution(
   /* Get the call site in the caller thread; will pass this into the micro batch thread */
   private val callSite = Utils.getCallSite()
 
-  /** Used to report metrics to coda-hale. */
-  lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name")
+  /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */
+  lazy val streamMetrics = new MetricsReporter(
+    this, s"spark.streaming.${Option(name).getOrElse(id)}")
 
   /**
    * The thread that runs the micro-batches of this stream. Note that this thread must be
@@ -137,7 +155,7 @@ class StreamExecution(
    * [[HDFSMetadataLog]]. See SPARK-14131 for more details.
    */
   val microBatchThread =
-    new StreamExecutionThread(s"stream execution thread for $name") {
+    new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
       override def run(): Unit = {
         // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
         // thread to this micro batch thread
@@ -191,7 +209,7 @@ class StreamExecution(
         sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
 
-      postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception.
+      postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception.
 
       // Unblock starting thread
       startLatch.countDown()
@@ -261,10 +279,10 @@ class StreamExecution(
       case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
           this,
-          s"Query $name terminated with exception: ${e.getMessage}",
+          s"Query $prettyIdString terminated with exception: ${e.getMessage}",
           e,
-          committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
-          availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
+          committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
+          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
         logError(s"Query $name terminated with error", e)
         updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
@@ -282,7 +300,7 @@ class StreamExecution(
       // Notify others
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
       postEvent(
-       new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
+       new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
       terminationLatch.countDown()
     }
   }
@@ -301,9 +319,9 @@ class StreamExecution(
         logInfo(s"Resuming streaming query, starting with batch $batchId")
         currentBatchId = batchId
         availableOffsets = nextOffsets.toStreamProgress(sources)
-        streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}"))
+        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
         logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
-          s"at batch timestamp ${streamExecutionMetadata.batchTimestampMs}")
+          s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
 
         offsetLog.get(batchId - 1).foreach {
           case lastOffsets =>
@@ -359,15 +377,15 @@ class StreamExecution(
     }
     if (hasNewData) {
       // Current batch timestamp in milliseconds
-      streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+      offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
       updateStatusMessage("Writing offsets to log")
       reportTimeTaken("walCommit") {
         assert(offsetLog.add(
           currentBatchId,
-          availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)),
+          availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
           s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId. " +
-          s"Metadata ${streamExecutionMetadata.toString}")
+          s"Metadata ${offsetSeqMetadata.toString}")
 
         // NOTE: The following code is correct because runBatches() processes exactly one
         // batch at a time. If we add pipeline parallelism (multiple batches in flight at
@@ -437,21 +455,21 @@ class StreamExecution(
     val triggerLogicalPlan = withNewSources transformAllExpressions {
       case a: Attribute if replacementMap.contains(a) => replacementMap(a)
       case ct: CurrentTimestamp =>
-        CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs,
+        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
           ct.dataType)
       case cd: CurrentDate =>
-        CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs,
+        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
           cd.dataType)
     }
 
-    val executedPlan = reportTimeTaken("queryPlanning") {
+    reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
         sparkSession,
         triggerLogicalPlan,
         outputMode,
         checkpointFile("state"),
         currentBatchId,
-        streamExecutionMetadata.batchWatermarkMs)
+        offsetSeqMetadata.batchWatermarkMs)
       lastExecution.executedPlan // Force the lazy generation of execution plan
     }
 
@@ -468,12 +486,12 @@ class StreamExecution(
         logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
         (e.maxEventTime.value / 1000) - e.delay.milliseconds()
     }.headOption.foreach { newWatermark =>
-      if (newWatermark > streamExecutionMetadata.batchWatermarkMs) {
+      if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
         logInfo(s"Updating eventTime watermark to: $newWatermark ms")
-        streamExecutionMetadata.batchWatermarkMs = newWatermark
+        offsetSeqMetadata.batchWatermarkMs = newWatermark
       } else {
         logTrace(s"Event time didn't move: $newWatermark < " +
-          s"$streamExecutionMetadata.currentEventTimeWatermark")
+          s"$offsetSeqMetadata.currentEventTimeWatermark")
       }
     }
 
@@ -503,7 +521,7 @@ class StreamExecution(
       microBatchThread.join()
     }
     uniqueSources.foreach(_.stop())
-    logInfo(s"Query $name was stopped")
+    logInfo(s"Query $prettyIdString was stopped")
   }
 
   /**
@@ -594,7 +612,7 @@ class StreamExecution(
   override def explain(): Unit = explain(extended = false)
 
   override def toString: String = {
-    s"Streaming Query - $name [state = $state]"
+    s"Streaming Query $prettyIdString [state = $state]"
   }
 
   def toDebugString: String = {
@@ -603,7 +621,7 @@ class StreamExecution(
     } else ""
     s"""
        |=== Streaming Query ===
-       |Name: $name
+       |Identifier: $prettyIdString
        |Current Offsets: $committedOffsets
        |
        |Current State: $state
@@ -622,33 +640,6 @@ class StreamExecution(
   case object TERMINATED extends State
 }
 
-/**
- * Contains metadata associated with a stream execution. This information is
- * persisted to the offset log via the OffsetSeq metadata field. Current
- * information contained in this object includes:
- *
- * @param batchWatermarkMs: The current eventTime watermark, used to
- * bound the lateness of data that will processed. Time unit: milliseconds
- * @param batchTimestampMs: The current batch processing timestamp.
- * Time unit: milliseconds
- */
-case class StreamExecutionMetadata(
-    var batchWatermarkMs: Long = 0,
-    var batchTimestampMs: Long = 0) {
-  private implicit val formats = StreamExecutionMetadata.formats
-
-  /**
-   * JSON string representation of this object.
-   */
-  def json: String = Serialization.write(this)
-}
-
-object StreamExecutionMetadata {
-  private implicit val formats = Serialization.formats(NoTypeHints)
-
-  def apply(json: String): StreamExecutionMetadata =
-    Serialization.read[StreamExecutionMetadata](json)
-}
 
 /**
  * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
new file mode 100644
index 0000000..7807c9f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{InputStreamReader, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.streaming.StreamingQuery
+
+/**
+ * Contains metadata associated with a [[StreamingQuery]]. This information is written
+ * in the checkpoint location the first time a query is started and recovered every time the query
+ * is restarted.
+ *
+ * @param id  unique id of the [[StreamingQuery]] that needs to be persisted across restarts
+ */
+case class StreamMetadata(id: String) {
+  def json: String = Serialization.write(this)(StreamMetadata.format)
+}
+
+object StreamMetadata extends Logging {
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  /** Read the metadata from file if it exists */
+  def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
+    val fs = FileSystem.get(hadoopConf)
+    if (fs.exists(metadataFile)) {
+      var input: FSDataInputStream = null
+      try {
+        input = fs.open(metadataFile)
+        val reader = new InputStreamReader(input, StandardCharsets.UTF_8)
+        val metadata = Serialization.read[StreamMetadata](reader)
+        Some(metadata)
+      } catch {
+        case NonFatal(e) =>
+          logError(s"Error reading stream metadata from $metadataFile", e)
+          throw e
+      } finally {
+        IOUtils.closeQuietly(input)
+      }
+    } else None
+  }
+
+  /** Write metadata to file */
+  def write(
+      metadata: StreamMetadata,
+      metadataFile: Path,
+      hadoopConf: Configuration): Unit = {
+    var output: FSDataOutputStream = null
+    try {
+      val fs = FileSystem.get(hadoopConf)
+      output = fs.create(metadataFile)
+      val writer = new OutputStreamWriter(output)
+      Serialization.write(metadata, writer)
+      writer.close()
+    } catch {
+      case NonFatal(e) =>
+        logError(s"Error writing stream metadata $metadata to $metadataFile", e)
+        throw e
+    } finally {
+      IOUtils.closeQuietly(output)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index 21b8750..a3f3662 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -26,7 +26,7 @@ class StreamProgress(
     val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset])
   extends scala.collection.immutable.Map[Source, Offset] {
 
-  def toOffsetSeq(source: Seq[Source], metadata: String): OffsetSeq = {
+  def toOffsetSeq(source: Seq[Source], metadata: OffsetSeqMetadata): OffsetSeq = {
     OffsetSeq(source.map(get), Some(metadata))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 8fc4e43..1794e75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -32,21 +32,32 @@ import org.apache.spark.sql.SparkSession
 trait StreamingQuery {
 
   /**
-   * Returns the name of the query. This name is unique across all active queries. This can be
-   * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as
-   * `dataframe.writeStream.queryName("query").start()`.
+   * Returns the user-specified name of the query, or null if not specified.
+   * This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
+   * as `dataframe.writeStream.queryName("query").start()`.
+   * This name, if set, must be unique across all active queries.
    *
    * @since 2.0.0
    */
   def name: String
 
   /**
-   * Returns the unique id of this query.
+   * Returns the unique id of this query that persists across restarts from checkpoint data.
+   * That is, this id is generated when a query is started for the first time, and
+   * will be the same every time it is restarted from checkpoint data. Also see [[runId]].
+   *
    * @since 2.1.0
    */
   def id: UUID
 
   /**
+   * Returns the unique id of this run of the query. That is, every start/restart of a query will
+   * generated a unique runId. Therefore, every time a query is restarted from
+   * checkpoint, it will have the same [[id]] but different [[runId]]s.
+   */
+  def runId: UUID
+
+  /**
    * Returns the `SparkSession` associated with `this`.
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index d9ee75c..6fc859d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -86,7 +86,10 @@ object StreamingQueryListener {
    * @since 2.1.0
    */
   @Experimental
-  class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event
+  class QueryStartedEvent private[sql](
+      val id: UUID,
+      val runId: UUID,
+      val name: String) extends Event
 
   /**
    * :: Experimental ::
@@ -106,5 +109,8 @@ object StreamingQueryListener {
    * @since 2.1.0
    */
   @Experimental
-  class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event
+  class QueryTerminatedEvent private[sql](
+      val id: UUID,
+      val runId: UUID,
+      val exception: Option[String]) extends Event
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c448468..c6ab416 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -207,10 +207,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       trigger: Trigger = ProcessingTime(0),
       triggerClock: Clock = new SystemClock()): StreamingQuery = {
     activeQueriesLock.synchronized {
-      val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}")
-      if (activeQueries.values.exists(_.name == name)) {
-        throw new IllegalArgumentException(
-          s"Cannot start query with name $name as a query with that name is already active")
+      val name = userSpecifiedName match {
+        case Some(n) =>
+          if (activeQueries.values.exists(_.name == userSpecifiedName.get)) {
+            throw new IllegalArgumentException(
+              s"Cannot start query with name $n as a query with that name is already active")
+          }
+          n
+        case None => null
       }
       val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
         new Path(userSpecified).toUri.toString
@@ -268,6 +272,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
         trigger,
         triggerClock,
         outputMode)
+
+      if (activeQueries.values.exists(_.id == query.id)) {
+        throw new IllegalStateException(
+          s"Cannot start query with id ${query.id} as another query with same id is " +
+            s"already active. Perhaps you are attempting to restart a query from checkpoint" +
+            s"that is already active.")
+      }
+
       query.start()
       activeQueries.put(query.id, query)
       query
@@ -287,8 +299,3 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
     }
   }
 }
-
-private object StreamingQueryManager {
-  private val _nextId = new AtomicLong(0)
-  private def nextId: Long = _nextId.getAndIncrement()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index fb5bad0..f768080 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -57,8 +57,9 @@ class StateOperatorProgress private[sql](
  * a trigger. Each event relates to processing done for a single trigger of the streaming
  * query. Events are emitted even when no new data is available to be processed.
  *
- * @param id A unique id of the query.
- * @param name Name of the query. This name is unique across all active queries.
+ * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name User-specified name of the query, null if not specified.
  * @param timestamp Timestamp (ms) of the beginning of the trigger.
  * @param batchId A unique id for the current batch of data being processed.  Note that in the
  *                case of retries after a failure a given batchId my be executed more than once.
@@ -73,6 +74,7 @@ class StateOperatorProgress private[sql](
 @Experimental
 class StreamingQueryProgress private[sql](
   val id: UUID,
+  val runId: UUID,
   val name: String,
   val timestamp: Long,
   val batchId: Long,
@@ -105,6 +107,7 @@ class StreamingQueryProgress private[sql](
     }
 
     ("id" -> JString(id.toString)) ~
+    ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JInt(timestamp)) ~
     ("numInputRows" -> JInt(numInputRows)) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt
new file mode 100644
index 0000000..79613e2
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt
@@ -0,0 +1,3 @@
+{
+  "id": "d366a8bf-db79-42ca-b5a4-d9ca0a11d63e"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 3afd11f..d3a83ea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -27,10 +27,19 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
   /** test string offset type */
   case class StringOffset(override val json: String) extends Offset
 
-  testWithUninterruptibleThread("serialization - deserialization") {
+  test("OffsetSeqMetadata - deserialization") {
+    assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
+    assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+    assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+    assert(
+      OffsetSeqMetadata(1, 2) ===
+        OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") {
     withTempDir { temp =>
       val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
-    val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
+      val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
       val batch0 = OffsetSeq.fill(LongOffset(0), LongOffset(1), LongOffset(2))
       val batch1 = OffsetSeq.fill(StringOffset("one"), StringOffset("two"), StringOffset("three"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala
new file mode 100644
index 0000000..87f8004
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+import java.util.UUID
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.streaming.StreamTest
+
+class StreamMetadataSuite extends StreamTest {
+
+  test("writing and reading") {
+    withTempDir { dir =>
+      val id = UUID.randomUUID.toString
+      val metadata = StreamMetadata(id)
+      val file = new Path(new File(dir, "test").toString)
+      StreamMetadata.write(metadata, file, hadoopConf)
+      val readMetadata = StreamMetadata.read(file, hadoopConf)
+      assert(readMetadata.nonEmpty)
+      assert(readMetadata.get.id === id)
+    }
+  }
+
+  test("read Spark 2.1.0 format") {
+    // query-metadata-logs-version-2.1.0.txt has the execution metadata generated by Spark 2.1.0
+    assert(
+      readForResource("query-metadata-logs-version-2.1.0.txt") ===
+      StreamMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e"))
+  }
+
+  private def readForResource(fileName: String): StreamMetadata = {
+    val input = getClass.getResource(s"/structured-streaming/$fileName")
+    StreamMetadata.read(new Path(input.toString), hadoopConf).get
+  }
+
+  private val hadoopConf = new Configuration()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
deleted file mode 100644
index c7139c5..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata
-
-class StreamExecutionMetadataSuite extends StreamTest {
-
-  test("stream execution metadata") {
-    assert(StreamExecutionMetadata(0, 0) ===
-      StreamExecutionMetadata("""{}"""))
-    assert(StreamExecutionMetadata(1, 0) ===
-      StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
-    assert(StreamExecutionMetadata(0, 2) ===
-      StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
-    assert(StreamExecutionMetadata(1, 2) ===
-      StreamExecutionMetadata(
-        """{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 3086abf..a38c05e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -69,6 +69,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         AssertOnQuery { query =>
           assert(listener.startEvent !== null)
           assert(listener.startEvent.id === query.id)
+          assert(listener.startEvent.runId === query.runId)
           assert(listener.startEvent.name === query.name)
           assert(listener.progressEvents.isEmpty)
           assert(listener.terminationEvent === null)
@@ -92,6 +93,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
           eventually(Timeout(streamingTimeout)) {
             assert(listener.terminationEvent !== null)
             assert(listener.terminationEvent.id === query.id)
+            assert(listener.terminationEvent.runId === query.runId)
             assert(listener.terminationEvent.exception === None)
           }
           listener.checkAsyncErrors()
@@ -167,30 +169,40 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   }
 
   test("QueryStartedEvent serialization") {
-    val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
-    val json = JsonProtocol.sparkEventToJson(queryStarted)
-    val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryStartedEvent]
+    def testSerialization(event: QueryStartedEvent): Unit = {
+      val json = JsonProtocol.sparkEventToJson(event)
+      val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryStartedEvent]
+      assert(newEvent.id === event.id)
+      assert(newEvent.runId === event.runId)
+      assert(newEvent.name === event.name)
+    }
+
+    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name"))
+    testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null))
   }
 
   test("QueryProgressEvent serialization") {
-    val event = new StreamingQueryListener.QueryProgressEvent(
-      StreamingQueryStatusAndProgressSuite.testProgress)
-    val json = JsonProtocol.sparkEventToJson(event)
-    val newEvent = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryProgressEvent]
-    assert(event.progress.json === newEvent.progress.json)
+    def testSerialization(event: QueryProgressEvent): Unit = {
+      val json = JsonProtocol.sparkEventToJson(event)
+      val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
+      assert(newEvent.progress.json === event.progress.json)  // json as a proxy for equality
+    }
+    testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
+    testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
   }
 
   test("QueryTerminatedEvent serialization") {
+    def testSerialization(event: QueryTerminatedEvent): Unit = {
+      val json = JsonProtocol.sparkEventToJson(event)
+      val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryTerminatedEvent]
+      assert(newEvent.id === event.id)
+      assert(newEvent.runId === event.runId)
+      assert(newEvent.exception === event.exception)
+    }
+
     val exception = new RuntimeException("exception")
-    val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
-      UUID.randomUUID, Some(exception.getMessage))
-    val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
-    val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
-    assert(queryQueryTerminated.id === newQueryTerminated.id)
-    assert(queryQueryTerminated.exception === newQueryTerminated.exception)
+    testSerialization(
+      new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage)))
   }
 
   test("only one progress event per interval when no data") {

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 4da712f..96f19db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -31,12 +31,13 @@ import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
 class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
 
   test("StreamingQueryProgress - prettyJson") {
-    val json = testProgress.prettyJson
-    assert(json ===
+    val json1 = testProgress1.prettyJson
+    assert(json1 ===
       s"""
         |{
-        |  "id" : "${testProgress.id.toString}",
-        |  "name" : "name",
+        |  "id" : "${testProgress1.id.toString}",
+        |  "runId" : "${testProgress1.runId.toString}",
+        |  "name" : "myName",
         |  "timestamp" : 1,
         |  "numInputRows" : 678,
         |  "inputRowsPerSecond" : 10.0,
@@ -60,16 +61,48 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
         |  }
         |}
       """.stripMargin.trim)
-    assert(compact(parse(json)) === testProgress.json)
-
+    assert(compact(parse(json1)) === testProgress1.json)
+
+    val json2 = testProgress2.prettyJson
+    assert(
+      json2 ===
+        s"""
+         |{
+         |  "id" : "${testProgress2.id.toString}",
+         |  "runId" : "${testProgress2.runId.toString}",
+         |  "name" : null,
+         |  "timestamp" : 1,
+         |  "numInputRows" : 678,
+         |  "durationMs" : {
+         |    "total" : 0
+         |  },
+         |  "currentWatermark" : 3,
+         |  "stateOperators" : [ {
+         |    "numRowsTotal" : 0,
+         |    "numRowsUpdated" : 1
+         |  } ],
+         |  "sources" : [ {
+         |    "description" : "source",
+         |    "startOffset" : 123,
+         |    "endOffset" : 456,
+         |    "numInputRows" : 678
+         |  } ],
+         |  "sink" : {
+         |    "description" : "sink"
+         |  }
+         |}
+      """.stripMargin.trim)
+    assert(compact(parse(json2)) === testProgress2.json)
   }
 
   test("StreamingQueryProgress - json") {
-    assert(compact(parse(testProgress.json)) === testProgress.json)
+    assert(compact(parse(testProgress1.json)) === testProgress1.json)
+    assert(compact(parse(testProgress2.json)) === testProgress2.json)
   }
 
   test("StreamingQueryProgress - toString") {
-    assert(testProgress.toString === testProgress.prettyJson)
+    assert(testProgress1.toString === testProgress1.prettyJson)
+    assert(testProgress2.toString === testProgress2.prettyJson)
   }
 
   test("StreamingQueryStatus - prettyJson") {
@@ -94,9 +127,10 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
 }
 
 object StreamingQueryStatusAndProgressSuite {
-  val testProgress = new StreamingQueryProgress(
-    id = UUID.randomUUID(),
-    name = "name",
+  val testProgress1 = new StreamingQueryProgress(
+    id = UUID.randomUUID,
+    runId = UUID.randomUUID,
+    name = "myName",
     timestamp = 1L,
     batchId = 2L,
     durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
@@ -115,6 +149,28 @@ object StreamingQueryStatusAndProgressSuite {
     sink = new SinkProgress("sink")
   )
 
+  val testProgress2 = new StreamingQueryProgress(
+    id = UUID.randomUUID,
+    runId = UUID.randomUUID,
+    name = null, // should not be present in the json
+    timestamp = 1L,
+    batchId = 2L,
+    durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+    currentWatermark = 3L,
+    stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
+    sources = Array(
+      new SourceProgress(
+        description = "source",
+        startOffset = "123",
+        endOffset = "456",
+        numInputRows = 678,
+        inputRowsPerSecond = Double.NaN, // should not be present in the json
+        processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
+      )
+    ),
+    sink = new SinkProgress("sink")
+  )
+
   val testStatus = new StreamingQueryStatus("active", true, false)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bb57bfe9/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index f7fc194..893cb76 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
+import org.apache.commons.lang3.RandomStringUtils
 import org.scalactic.TolerantNumerics
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.BeforeAndAfter
@@ -28,7 +29,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
-import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.util.ManualClock
 
 
 class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
@@ -43,38 +44,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     sqlContext.streams.active.foreach(_.stop())
   }
 
-  test("names unique across active queries, ids unique across all started queries") {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map { 6 / _}
+  test("name unique in active queries") {
+    withTempDir { dir =>
+      def startQuery(name: Option[String]): StreamingQuery = {
+        val writer = MemoryStream[Int].toDS.writeStream
+        name.foreach(writer.queryName)
+        writer
+          .foreach(new TestForeachWriter)
+          .start()
+      }
 
-    def startQuery(queryName: String): StreamingQuery = {
-      val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
-      val writer = mapped.writeStream
-      writer
-        .queryName(queryName)
-        .format("memory")
-        .option("checkpointLocation", metadataRoot)
-        .start()
-    }
+      // No name by default, multiple active queries can have no name
+      val q1 = startQuery(name = None)
+      assert(q1.name === null)
+      val q2 = startQuery(name = None)
+      assert(q2.name === null)
+
+      // Can be set by user
+      val q3 = startQuery(name = Some("q3"))
+      assert(q3.name === "q3")
 
-    val q1 = startQuery("q1")
-    assert(q1.name === "q1")
+      // Multiple active queries cannot have same name
+      val e = intercept[IllegalArgumentException] {
+        startQuery(name = Some("q3"))
+      }
 
-    // Verify that another query with same name cannot be started
-    val e1 = intercept[IllegalArgumentException] {
-      startQuery("q1")
+      q1.stop()
+      q2.stop()
+      q3.stop()
     }
-    Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) }
+  }
 
-    // Verify q1 was unaffected by the above exception and stop it
-    assert(q1.isActive)
-    q1.stop()
+  test(
+    "id unique in active queries + persists across restarts, runId unique across start/restarts") {
+    val inputData = MemoryStream[Int]
+    withTempDir { dir =>
+      var cpDir: String = null
+
+      def startQuery(restart: Boolean): StreamingQuery = {
+        if (cpDir == null || !restart) cpDir = s"$dir/${RandomStringUtils.randomAlphabetic(10)}"
+        MemoryStream[Int].toDS().groupBy().count()
+          .writeStream
+          .format("memory")
+          .outputMode("complete")
+          .queryName(s"name${RandomStringUtils.randomAlphabetic(10)}")
+          .option("checkpointLocation", cpDir)
+          .start()
+      }
 
-    // Verify another query can be started with name q1, but will have different id
-    val q2 = startQuery("q1")
-    assert(q2.name === "q1")
-    assert(q2.id !== q1.id)
-    q2.stop()
+      // id and runId unique for new queries
+      val q1 = startQuery(restart = false)
+      val q2 = startQuery(restart = false)
+      assert(q1.id !== q2.id)
+      assert(q1.runId !== q2.runId)
+      q1.stop()
+      q2.stop()
+
+      // id persists across restarts, runId unique across restarts
+      val q3 = startQuery(restart = false)
+      q3.stop()
+
+      val q4 = startQuery(restart = true)
+      q4.stop()
+      assert(q3.id === q3.id)
+      assert(q3.runId !== q4.runId)
+
+      // Only one query with same id can be active
+      val q5 = startQuery(restart = false)
+      val e = intercept[IllegalStateException] {
+        startQuery(restart = true)
+      }
+    }
   }
 
   testQuietly("isActive, exception, and awaitTermination") {
@@ -105,9 +145,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
       AssertOnQuery(q => {
         q.exception.get.startOffset ===
-          q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
+          q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString &&
           q.exception.get.endOffset ===
-            q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
+            q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString
       }, "incorrect start offset or end offset on exception")
     )
   }
@@ -274,7 +314,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
 
     /** Whether metrics of a query is registered for reporting */
     def isMetricsRegistered(query: StreamingQuery): Boolean = {
-      val sourceName = s"spark.streaming.${query.name}"
+      val sourceName = s"spark.streaming.${query.id}"
       val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName)
       require(sources.size <= 1)
       sources.nonEmpty


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org