You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/03/17 23:14:37 UTC

spark git commit: [SPARK-19873][SS] Record num shuffle partitions in offset log and enforce in next batch.

Repository: spark
Updated Branches:
  refs/heads/master 7de66bae5 -> 3783539d7


[SPARK-19873][SS] Record num shuffle partitions in offset log and enforce in next batch.

## What changes were proposed in this pull request?

If the user changes the shuffle partition number between batches, Streaming aggregation will fail.

Here are some possible cases:

- Change "spark.sql.shuffle.partitions"
- Use "repartition" and change the partition number in codes
- RangePartitioner doesn't generate deterministic partitions. Right now it's safe as we disallow sort before aggregation. Not sure if we will add some operators using RangePartitioner in future.

## How was this patch tested?

- Unit tests
- Manual tests
  - forward compatibility tested by using the new `OffsetSeqMetadata` json with Spark v2.1.0

Author: Kunal Khamar <kk...@outlook.com>

Closes #17216 from kunalkhamar/num-partitions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3783539d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3783539d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3783539d

Branch: refs/heads/master
Commit: 3783539d7ab83a2a632a9f35ca66ae39d01c28b6
Parents: 7de66ba
Author: Kunal Khamar <kk...@outlook.com>
Authored: Fri Mar 17 16:16:22 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Mar 17 16:16:22 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/OffsetSeq.scala     |   8 +-
 .../execution/streaming/StreamExecution.scala   |  60 ++++++++---
 .../sql/streaming/StreamingQueryManager.scala   |   8 +-
 .../checkpoint-version-2.1.0/metadata           |   1 +
 .../checkpoint-version-2.1.0/offsets/0          |   3 +
 .../checkpoint-version-2.1.0/offsets/1          |   3 +
 .../checkpoint-version-2.1.0/state/0/0/1.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/0/2.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/1/1.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/1/2.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/2/1.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/2/2.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/3/1.delta  | Bin 0 -> 73 bytes
 .../checkpoint-version-2.1.0/state/0/3/2.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/4/1.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/4/2.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/5/1.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/5/2.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/6/1.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/6/2.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/7/1.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/7/2.delta  | Bin 0 -> 79 bytes
 .../checkpoint-version-2.1.0/state/0/8/1.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/8/2.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/9/1.delta  | Bin 0 -> 46 bytes
 .../checkpoint-version-2.1.0/state/0/9/2.delta  | Bin 0 -> 79 bytes
 .../execution/streaming/OffsetSeqLogSuite.scala |  38 +++++--
 .../spark/sql/streaming/StreamSuite.scala       | 101 ++++++++++++++++++-
 .../streaming/StreamingQueryManagerSuite.scala  |  10 --
 .../test/DataStreamReaderWriterSuite.scala      |  22 ++--
 30 files changed, 207 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index e5a1997..8249ada 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
-
 /**
  * An ordered collection of offsets, used to track the progress of processing data from one or more
  * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
@@ -70,8 +69,12 @@ object OffsetSeq {
  * bound the lateness of data that will processed. Time unit: milliseconds
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
+ * @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions.
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+    batchWatermarkMs: Long = 0,
+    batchTimestampMs: Long = 0,
+    conf: Map[String, String] = Map.empty) {
   def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
 }
 
@@ -79,4 +82,3 @@ object OffsetSeqMetadata {
   private implicit val format = Serialization.formats(NoTypeHints)
   def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5292638..40faddc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
@@ -117,7 +118,9 @@ class StreamExecution(
   }
 
   /** Metadata associated with the offset seq of a batch in the query. */
-  protected var offsetSeqMetadata = OffsetSeqMetadata()
+  protected var offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0,
+    conf = Map(SQLConf.SHUFFLE_PARTITIONS.key ->
+      sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS).toString))
 
   override val id: UUID = UUID.fromString(streamMetadata.id)
 
@@ -256,6 +259,15 @@ class StreamExecution(
       updateStatusMessage("Initializing sources")
       // force initialization of the logical plan so that the sources can be created
       logicalPlan
+
+      // Isolated spark session to run the batches with.
+      val sparkSessionToRunBatches = sparkSession.cloneSession()
+      // Adaptive execution can change num shuffle partitions, disallow
+      sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+      offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0,
+        conf = Map(SQLConf.SHUFFLE_PARTITIONS.key ->
+          sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS.key)))
+
       if (state.compareAndSet(INITIALIZING, ACTIVE)) {
         // Unblock `awaitInitialization`
         initializationLatch.countDown()
@@ -268,7 +280,7 @@ class StreamExecution(
               reportTimeTaken("triggerExecution") {
                 if (currentBatchId < 0) {
                   // We'll do this initialization only once
-                  populateStartOffsets()
+                  populateStartOffsets(sparkSessionToRunBatches)
                   logDebug(s"Stream running from $committedOffsets to $availableOffsets")
                 } else {
                   constructNextBatch()
@@ -276,7 +288,7 @@ class StreamExecution(
                 if (dataAvailable) {
                   currentStatus = currentStatus.copy(isDataAvailable = true)
                   updateStatusMessage("Processing new data")
-                  runBatch()
+                  runBatch(sparkSessionToRunBatches)
                 }
               }
 
@@ -381,13 +393,32 @@ class StreamExecution(
    *  - committedOffsets
    *  - availableOffsets
    */
-  private def populateStartOffsets(): Unit = {
+  private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
     offsetLog.getLatest() match {
       case Some((batchId, nextOffsets)) =>
         logInfo(s"Resuming streaming query, starting with batch $batchId")
         currentBatchId = batchId
         availableOffsets = nextOffsets.toStreamProgress(sources)
-        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+        // update offset metadata
+        nextOffsets.metadata.foreach { metadata =>
+          val shufflePartitionsSparkSession: Int =
+            sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+          val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, {
+            // For backward compatibility, if # partitions was not recorded in the offset log,
+            // then ensure it is not missing. The new value is picked up from the conf.
+            logWarning("Number of shuffle partitions from previous run not found in checkpoint. "
+              + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.")
+            shufflePartitionsSparkSession
+          })
+          offsetSeqMetadata = OffsetSeqMetadata(
+            metadata.batchWatermarkMs, metadata.batchTimestampMs,
+            metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString))
+          // Update conf with correct number of shuffle partitions
+          sparkSessionToRunBatches.conf.set(
+            SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString)
+        }
+
         logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
           s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
 
@@ -444,8 +475,7 @@ class StreamExecution(
       }
     }
     if (hasNewData) {
-      // Current batch timestamp in milliseconds
-      offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
       // Update the eventTime watermark if we find one in the plan.
       if (lastExecution != null) {
         lastExecution.executedPlan.collect {
@@ -453,16 +483,19 @@ class StreamExecution(
             logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
             e.eventTimeStats.value.max - e.delayMs
         }.headOption.foreach { newWatermarkMs =>
-          if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
+          if (newWatermarkMs > batchWatermarkMs) {
             logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-            offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
+            batchWatermarkMs = newWatermarkMs
           } else {
             logDebug(
               s"Event time didn't move: $newWatermarkMs < " +
-                s"${offsetSeqMetadata.batchWatermarkMs}")
+                s"$batchWatermarkMs")
           }
         }
       }
+      offsetSeqMetadata = offsetSeqMetadata.copy(
+        batchWatermarkMs = batchWatermarkMs,
+        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
 
       updateStatusMessage("Writing offsets to log")
       reportTimeTaken("walCommit") {
@@ -505,8 +538,9 @@ class StreamExecution(
 
   /**
    * Processes any data available between `availableOffsets` and `committedOffsets`.
+   * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
    */
-  private def runBatch(): Unit = {
+  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
     // Request unprocessed data from all sources.
     newData = reportTimeTaken("getBatch") {
       availableOffsets.flatMap {
@@ -551,7 +585,7 @@ class StreamExecution(
 
     reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
-        sparkSession,
+        sparkSessionToRunBatch,
         triggerLogicalPlan,
         outputMode,
         checkpointFile("state"),
@@ -561,7 +595,7 @@ class StreamExecution(
     }
 
     val nextBatch =
-      new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
+      new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
 
     reportTimeTaken("addBatch") {
       sink.addBatch(currentBatchId, nextBatch)

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 38edb40..7810d9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.execution.streaming._
@@ -40,7 +41,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
  */
 @Experimental
 @InterfaceStability.Evolving
-class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
+class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging {
 
   private[sql] val stateStoreCoordinator =
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
@@ -234,9 +235,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
     }
 
     if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
-      throw new AnalysisException(
-        s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
-          "is not supported in streaming DataFrames/Datasets")
+      logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
+          "is not supported in streaming DataFrames/Datasets and will be disabled.")
     }
 
     new StreamingQueryWrapper(new StreamExecution(

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata
new file mode 100644
index 0000000..3492220
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata
@@ -0,0 +1 @@
+{"id":"dddc5e7f-1e71-454c-8362-de184444fb5a"}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0
new file mode 100644
index 0000000..cbde042
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1489180207737}
+0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1
new file mode 100644
index 0000000..10b5774
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1489180209261}
+2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta
new file mode 100644
index 0000000..7dc49cb
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta
new file mode 100644
index 0000000..8b566e8
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta
new file mode 100644
index 0000000..ca2a7ed
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta
new file mode 100644
index 0000000..361f2db
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta
new file mode 100644
index 0000000..4c8804c
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta
new file mode 100644
index 0000000..7d3e07f
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta
new file mode 100644
index 0000000..fe521b8
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta
new file mode 100644
index 0000000..e69925c
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta
new file mode 100644
index 0000000..36397a3
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta
new file mode 100644
index 0000000..6352978
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta
new file mode 100644
index 0000000..0c9b6ac
Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta differ

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index f7f0dad..dc55632 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.util.stringToFile
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 
 class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
@@ -29,12 +30,37 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
   case class StringOffset(override val json: String) extends Offset
 
   test("OffsetSeqMetadata - deserialization") {
-    assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
-    assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
-    assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
-    assert(
-      OffsetSeqMetadata(1, 2) ===
-        OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+    val key = SQLConf.SHUFFLE_PARTITIONS.key
+
+    def getConfWith(shufflePartitions: Int): Map[String, String] = {
+      Map(key -> shufflePartitions.toString)
+    }
+
+    // None set
+    assert(OffsetSeqMetadata(0, 0, Map.empty) === OffsetSeqMetadata("""{}"""))
+
+    // One set
+    assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+    assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+    assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
+      OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
+
+    // Two set
+    assert(OffsetSeqMetadata(1, 2, Map.empty) ===
+      OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+    assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
+      OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
+    assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
+      OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+    // All set
+    assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+      OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+    // Drop unknown fields
+    assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+      OffsetSeqMetadata(
+        s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}},"unknown":1"""))
   }
 
   test("OffsetSeqLog - serialization - deserialization") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6dfcd8b..e867fc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,17 +17,20 @@
 
 package org.apache.spark.sql.streaming
 
-import java.io.{InterruptedIOException, IOException}
+import java.io.{File, InterruptedIOException, IOException}
 import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
 
 import scala.reflect.ClassTag
 import scala.util.control.ControlThrowable
 
+import org.apache.commons.io.FileUtils
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
     query.stop()
     assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of partitions") {
+    val inputData = MemoryStream[(Int, Int)]
+    val agg = inputData.toDS().groupBy("_1").count()
+
+    testStream(agg, OutputMode.Complete())(
+      AddData(inputData, (1, 0), (2, 0)),
+      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
+      CheckAnswer((1, 1), (2, 1)),
+      StopStream,
+      AddData(inputData, (3, 0), (2, 0)),
+      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
+      CheckAnswer((1, 1), (2, 2), (3, 1)),
+      StopStream,
+      AddData(inputData, (3, 0), (1, 0)),
+      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
+      CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("recover from a Spark v2.1 checkpoint") {
+    var inputData: MemoryStream[Int] = null
+    var query: DataStreamWriter[Row] = null
+
+    def prepareMemoryStream(): Unit = {
+      inputData = MemoryStream[Int]
+      inputData.addData(1, 2, 3, 4)
+      inputData.addData(3, 4, 5, 6)
+      inputData.addData(5, 6, 7, 8)
+
+      query = inputData
+        .toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .writeStream
+        .outputMode("complete")
+        .format("memory")
+    }
+
+    // Get an existing checkpoint generated by Spark v2.1.
+    // v2.1 does not record # shuffle partitions in the offset metadata.
+    val resourceUri =
+      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+    val checkpointDir = new File(resourceUri)
+
+    // 1 - Test if recovery from the checkpoint is successful.
+    prepareMemoryStream()
+    withTempDir { dir =>
+      // Copy the checkpoint to a temp dir to prevent changes to the original.
+      // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
+      FileUtils.copyDirectory(checkpointDir, dir)
+
+      // Checkpoint data was generated by a query with 10 shuffle partitions.
+      // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
+      // since the last batch may be rerun.
+      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+        var streamingQuery: StreamingQuery = null
+        try {
+          streamingQuery =
+            query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start()
+          streamingQuery.processAllAvailable()
+          inputData.addData(9)
+          streamingQuery.processAllAvailable()
+
+          QueryTest.checkAnswer(spark.table("counts").toDF(),
+            Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+            Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
+        } finally {
+          if (streamingQuery ne null) {
+            streamingQuery.stop()
+          }
+        }
+      }
+    }
+
+    // 2 - Check recovery with wrong num shuffle partitions
+    prepareMemoryStream()
+    withTempDir { dir =>
+      FileUtils.copyDirectory(checkpointDir, dir)
+
+      // Since the number of partitions is greater than 10, should throw exception.
+      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
+        var streamingQuery: StreamingQuery = null
+        try {
+          intercept[StreamingQueryException] {
+            streamingQuery =
+              query.queryName("badQuery").option("checkpointLocation", dir.getCanonicalPath).start()
+            streamingQuery.processAllAvailable()
+          }
+        } finally {
+          if (streamingQuery ne null) {
+            streamingQuery.stop()
+          }
+        }
+      }
+    }
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index f05e9d1..b49efa6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -239,16 +239,6 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
-  test("SPARK-19268: Adaptive query execution should be disallowed") {
-    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
-      val e = intercept[AnalysisException] {
-        MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start()
-      }
-      assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) &&
-        e.getMessage.contains("not supported"))
-    }
-  }
-
   /** Run a body of code by defining a query on each dataset */
   private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
     failAfter(streamingTimeout) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3783539d/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index f61dcdc..341ab0e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import scala.concurrent.duration._
 
 import org.apache.hadoop.fs.Path
+import org.mockito.Matchers.{any, eq => meq}
 import org.mockito.Mockito._
 import org.scalatest.BeforeAndAfter
 
@@ -370,21 +371,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
       .option("checkpointLocation", checkpointLocationURI.toString)
       .trigger(ProcessingTime(10.seconds))
       .start()
+    q.processAllAvailable()
     q.stop()
 
     verify(LastOptions.mockStreamSourceProvider).createSource(
-      spark.sqlContext,
-      s"$checkpointLocationURI/sources/0",
-      None,
-      "org.apache.spark.sql.streaming.test",
-      Map.empty)
+      any(),
+      meq(s"$checkpointLocationURI/sources/0"),
+      meq(None),
+      meq("org.apache.spark.sql.streaming.test"),
+      meq(Map.empty))
 
     verify(LastOptions.mockStreamSourceProvider).createSource(
-      spark.sqlContext,
-      s"$checkpointLocationURI/sources/1",
-      None,
-      "org.apache.spark.sql.streaming.test",
-      Map.empty)
+      any(),
+      meq(s"$checkpointLocationURI/sources/1"),
+      meq(None),
+      meq("org.apache.spark.sql.streaming.test"),
+      meq(Map.empty))
   }
 
   private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org