You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/07/23 20:03:43 UTC

spark git commit: [SPARK-24699][SS] Make watermarks work with Trigger.Once by saving updated watermark to commit log

Repository: spark
Updated Branches:
  refs/heads/master 2edf17eff -> 61f0ca4f1


[SPARK-24699][SS] Make watermarks work with Trigger.Once by saving updated watermark to commit log

## What changes were proposed in this pull request?

Streaming queries with watermarks do not work with Trigger.Once because of the following.
- Watermark is updated in the driver memory after a batch completes, but it is persisted to checkpoint (in the offset log) only when the next batch is planned
- In trigger.once, the query terminated as soon as one batch has completed. Hence, the updated watermark is never persisted anywhere.

The simple solution is to persist the updated watermark value in the commit log when a batch is marked as completed. Then the next batch, in the next trigger.once run can pick it up from the commit log.

## How was this patch tested?
new unit tests

Co-authored-by: Tathagata Das <tathagata.das1565gmail.com>
Co-authored-by: c-horn <chorn4033gmail.com>

Author: Tathagata Das <ta...@gmail.com>

Closes #21746 from tdas/SPARK-24699.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61f0ca4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61f0ca4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61f0ca4f

Branch: refs/heads/master
Commit: 61f0ca4f1c4f1498c0b6ad02370839619871d6c5
Parents: 2edf17e
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Jul 23 13:03:32 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Jul 23 13:03:32 2018 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/CommitLog.scala     |  33 ++--
 .../streaming/MicroBatchExecution.scala         |   9 +-
 .../continuous/ContinuousExecution.scala        |   2 +-
 .../commits/0                                   |   2 +
 .../commits/1                                   |   2 +
 .../metadata                                    |   1 +
 .../offsets/0                                   |   3 +
 .../offsets/1                                   |   3 +
 .../state/0/0/1.delta                           | Bin 0 -> 46 bytes
 .../state/0/0/2.delta                           | Bin 0 -> 46 bytes
 .../state/0/1/1.delta                           | Bin 0 -> 46 bytes
 .../state/0/1/2.delta                           | Bin 0 -> 46 bytes
 .../state/0/2/1.delta                           | Bin 0 -> 103 bytes
 .../state/0/2/2.delta                           | Bin 0 -> 46 bytes
 .../state/0/3/1.delta                           | Bin 0 -> 46 bytes
 .../state/0/3/2.delta                           | Bin 0 -> 46 bytes
 .../state/0/4/1.delta                           | Bin 0 -> 46 bytes
 .../state/0/4/2.delta                           | Bin 0 -> 103 bytes
 .../sql/streaming/EventTimeWatermarkSuite.scala | 156 ++++++++++++++++---
 .../apache/spark/sql/streaming/StreamTest.scala |   8 +-
 20 files changed, 177 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
index 5b11424..0063318 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
@@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets._
 
 import scala.io.{Source => IOSource}
 
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
 import org.apache.spark.sql.SparkSession
 
 /**
@@ -43,36 +46,28 @@ import org.apache.spark.sql.SparkSession
  * line 2: metadata (optional json string)
  */
 class CommitLog(sparkSession: SparkSession, path: String)
-  extends HDFSMetadataLog[String](sparkSession, path) {
+  extends HDFSMetadataLog[CommitMetadata](sparkSession, path) {
 
   import CommitLog._
 
-  def add(batchId: Long): Unit = {
-    super.add(batchId, EMPTY_JSON)
-  }
-
-  override def add(batchId: Long, metadata: String): Boolean = {
-    throw new UnsupportedOperationException(
-      "CommitLog does not take any metadata, use 'add(batchId)' instead")
-  }
-
-  override protected def deserialize(in: InputStream): String = {
+  override protected def deserialize(in: InputStream): CommitMetadata = {
     // called inside a try-finally where the underlying stream is closed in the caller
     val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
     if (!lines.hasNext) {
       throw new IllegalStateException("Incomplete log file in the offset commit log")
     }
     parseVersion(lines.next.trim, VERSION)
-    EMPTY_JSON
+    val metadataJson = if (lines.hasNext) lines.next else EMPTY_JSON
+    CommitMetadata(metadataJson)
   }
 
-  override protected def serialize(metadata: String, out: OutputStream): Unit = {
+  override protected def serialize(metadata: CommitMetadata, out: OutputStream): Unit = {
     // called inside a try-finally where the underlying stream is closed in the caller
     out.write(s"v${VERSION}".getBytes(UTF_8))
     out.write('\n')
 
     // write metadata
-    out.write(EMPTY_JSON.getBytes(UTF_8))
+    out.write(metadata.json.getBytes(UTF_8))
   }
 }
 
@@ -81,3 +76,13 @@ object CommitLog {
   private val EMPTY_JSON = "{}"
 }
 
+
+case class CommitMetadata(nextBatchWatermarkMs: Long = 0) {
+  def json: String = Serialization.write(this)(CommitMetadata.format)
+}
+
+object CommitMetadata {
+  implicit val format = Serialization.formats(NoTypeHints)
+
+  def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 45c43f5..abb807d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -268,7 +268,7 @@ class MicroBatchExecution(
          * latest batch id in the offset log, then we can safely move to the next batch
          * i.e., committedBatchId + 1 */
         commitLog.getLatest() match {
-          case Some((latestCommittedBatchId, _)) =>
+          case Some((latestCommittedBatchId, commitMetadata)) =>
             if (latestBatchId == latestCommittedBatchId) {
               /* The last batch was successfully committed, so we can safely process a
                * new next batch but first:
@@ -286,7 +286,8 @@ class MicroBatchExecution(
               currentBatchId = latestCommittedBatchId + 1
               isCurrentBatchConstructed = false
               committedOffsets ++= availableOffsets
-              // Construct a new batch be recomputing availableOffsets
+              watermarkTracker.setWatermark(
+                math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
             } else if (latestCommittedBatchId < latestBatchId - 1) {
               logWarning(s"Batch completion log latest batch id is " +
                 s"${latestCommittedBatchId}, which is not trailing " +
@@ -536,11 +537,11 @@ class MicroBatchExecution(
     }
 
     withProgressLocked {
-      commitLog.add(currentBatchId)
+      watermarkTracker.updateWatermark(lastExecution.executedPlan)
+      commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
       committedOffsets ++= availableOffsets
       awaitProgressLockCondition.signalAll()
     }
-    watermarkTracker.updateWatermark(lastExecution.executedPlan)
     logDebug(s"Completed batch ${currentBatchId}")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index e991dbc..140cec6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -314,7 +314,7 @@ class ContinuousExecution(
       // Record offsets before updating `committedOffsets`
       recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
       if (queryExecutionThread.isAlive) {
-        commitLog.add(epoch)
+        commitLog.add(epoch, CommitMetadata())
         val offset =
           continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
         committedOffsets ++= Seq(continuousSources(0) -> offset)

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/0
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/0
new file mode 100644
index 0000000..83321cd
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/0
@@ -0,0 +1,2 @@
+v1
+{}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/1
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/1
new file mode 100644
index 0000000..83321cd
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/1
@@ -0,0 +1,2 @@
+v1
+{}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/metadata
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/metadata
new file mode 100644
index 0000000..f205857
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/metadata
@@ -0,0 +1 @@
+{"id":"73f7f943-0a08-4ffb-a504-9fa88ff7612a"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/0
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/0
new file mode 100644
index 0000000..8fa80be
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1531991874513,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/1
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/1
new file mode 100644
index 0000000..2248a58
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":5000,"batchTimestampMs":1531991878604,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
+1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/1.delta
new file mode 100644
index 0000000..171aa58
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/2.delta
new file mode 100644
index 0000000..cfb3a48
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 58ed979..026af17 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -127,31 +127,133 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
     testStream(aggWithWatermark)(
       AddData(inputData2, 15),
       CheckAnswer(),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(15))
-        assert(e.get("min") === formatTimestamp(15))
-        assert(e.get("avg") === formatTimestamp(15))
-        assert(e.get("watermark") === formatTimestamp(0))
-      },
+      assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0),
       AddData(inputData2, 10, 12, 14),
       CheckAnswer(),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(14))
-        assert(e.get("min") === formatTimestamp(10))
-        assert(e.get("avg") === formatTimestamp(12))
-        assert(e.get("watermark") === formatTimestamp(5))
-      },
+      assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5),
       AddData(inputData2, 25),
       CheckAnswer((10, 3)),
-      assertEventStats { e =>
-        assert(e.get("max") === formatTimestamp(25))
-        assert(e.get("min") === formatTimestamp(25))
-        assert(e.get("avg") === formatTimestamp(25))
-        assert(e.get("watermark") === formatTimestamp(5))
-      }
+      assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5)
     )
   }
 
+  test("event time and watermark metrics with Trigger.Once (SPARK-24699)") {
+    // All event time metrics where watermarking is set
+    val inputData = MemoryStream[Int]
+    val aggWithWatermark = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    // Unlike the ProcessingTime trigger, Trigger.Once only runs one trigger every time
+    // the query is started and it does not run no-data batches. Hence the answer generated
+    // by the updated watermark is only generated the next time the query is started.
+    // Also, the data to process in the next trigger is added *before* starting the stream in
+    // Trigger.Once to ensure that first and only trigger picks up the new data.
+
+    testStream(aggWithWatermark)(
+      StartStream(Trigger.Once),  // to make sure the query is not running when adding data 1st time
+      awaitTermination(),
+
+      AddData(inputData, 15),
+      StartStream(Trigger.Once),
+      awaitTermination(),
+      CheckNewAnswer(),
+      assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0),
+      // watermark should be updated to 15 - 10 = 5
+
+      AddData(inputData, 10, 12, 14),
+      StartStream(Trigger.Once),
+      awaitTermination(),
+      CheckNewAnswer(),
+      assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5),
+      // watermark should stay at 5
+
+      AddData(inputData, 25),
+      StartStream(Trigger.Once),
+      awaitTermination(),
+      CheckNewAnswer(),
+      assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5),
+      // watermark should be updated to 25 - 10 = 15
+
+      AddData(inputData, 50),
+      StartStream(Trigger.Once),
+      awaitTermination(),
+      CheckNewAnswer((10, 3)),   // watermark = 15 is used to generate this
+      assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15),
+      // watermark should be updated to 50 - 10 = 40
+
+      AddData(inputData, 50),
+      StartStream(Trigger.Once),
+      awaitTermination(),
+      CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this
+      assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40))
+  }
+
+  test("recovery from Spark ver 2.3.1 commit log without commit metadata (SPARK-24699)") {
+    // All event time metrics where watermarking is set
+    val inputData = MemoryStream[Int]
+    val aggWithWatermark = inputData.toDF()
+        .withColumn("eventTime", $"value".cast("timestamp"))
+        .withWatermark("eventTime", "10 seconds")
+        .groupBy(window($"eventTime", "5 seconds") as 'window)
+        .agg(count("*") as 'count)
+        .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+
+    val resourceUri = this.getClass.getResource(
+      "/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/").toURI
+
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    inputData.addData(15)
+    inputData.addData(10, 12, 14)
+
+    testStream(aggWithWatermark)(
+      /*
+
+      Note: The checkpoint was generated using the following input in Spark version 2.3.1
+
+      StartStream(checkpointLocation = "./sql/core/src/test/resources/structured-streaming/" +
+        "checkpoint-version-2.3.1-without-commit-log-metadata/")),
+      AddData(inputData, 15),  // watermark should be updated to 15 - 10 = 5
+      CheckAnswer(),
+      AddData(inputData, 10, 12, 14),  // watermark should stay at 5
+      CheckAnswer(),
+      StopStream,
+
+      // Offset log should have watermark recorded as 5.
+      */
+
+      StartStream(Trigger.Once),
+      awaitTermination(),
+
+      AddData(inputData, 25),
+      StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
+      awaitTermination(),
+      CheckNewAnswer(),
+      assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5),
+      // watermark should be updated to 25 - 10 = 15
+
+      AddData(inputData, 50),
+      StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
+      awaitTermination(),
+      CheckNewAnswer((10, 3)),   // watermark = 15 is used to generate this
+      assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15),
+      // watermark should be updated to 50 - 10 = 40
+
+      AddData(inputData, 50),
+      StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
+      awaitTermination(),
+      CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this
+      assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40))
+  }
+
   test("append mode") {
     val inputData = MemoryStream[Int]
 
@@ -625,10 +727,20 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
     true
   }
 
+  /** Assert event stats generated on that last batch with data in it */
   private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
-    AssertOnQuery { q =>
+    Execute("AssertEventStats") { q =>
       body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
-      true
+    }
+  }
+
+  /** Assert event stats generated on that last batch with data in it */
+  private def assertEventStats(min: Long, max: Long, avg: Double, wtrmark: Long): AssertOnQuery = {
+    assertEventStats { e =>
+      assert(e.get("min") === formatTimestamp(min), s"min value mismatch")
+      assert(e.get("max") === formatTimestamp(max), s"max value mismatch")
+      assert(e.get("avg") === formatTimestamp(avg.toLong), s"avg value mismatch")
+      assert(e.get("watermark") === formatTimestamp(wtrmark), s"watermark value mismatch")
     }
   }
 
@@ -638,4 +750,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
   private def formatTimestamp(sec: Long): String = {
     timestampFormat.format(new ju.Date(sec * 1000))
   }
+
+  private def awaitTermination(): AssertOnQuery = Execute("AwaitTermination") { q =>
+    q.awaitTermination()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/61f0ca4f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 4c3fd58..df22bc1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -291,8 +291,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
 
   /** Execute arbitrary code */
   object Execute {
-    def apply(func: StreamExecution => Any): AssertOnQuery =
-      AssertOnQuery(query => { func(query); true }, "Execute")
+    def apply(name: String)(func: StreamExecution => Any): AssertOnQuery =
+      AssertOnQuery(query => { func(query); true }, "name")
+
+    def apply(func: StreamExecution => Any): AssertOnQuery = apply("Execute")(func)
   }
 
   object AwaitEpoch {
@@ -512,7 +514,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
       logInfo(s"Processing test stream action: $action")
       action match {
         case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) =>
-          verify(currentStream == null, "stream already running")
+          verify(currentStream == null || !currentStream.isActive, "stream already running")
           verify(triggerClock.isInstanceOf[SystemClock]
             || triggerClock.isInstanceOf[StreamManualClock],
             "Use either SystemClock or StreamManualClock to start the stream")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org