You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by kunalkhamar <gi...@git.apache.org> on 2017/03/08 22:50:19 UTC

[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

GitHub user kunalkhamar opened a pull request:

    https://github.com/apache/spark/pull/17216

    [SPARK-19873][SS] Record num shuffle partitions in offset log and enforce in next batch.

    ## What changes were proposed in this pull request?
    
    If the user changes the shuffle partition number between batches, Streaming aggregation will fail.
    
    Here are some possible cases:
    
    - Change "spark.sql.shuffle.partitions"
    - Use "repartition" and change the partition number in codes
    - RangePartitioner doesn't generate deterministic partitions. Right now it's safe as we disallow sort before aggregation. Not sure if we will add some operators using RangePartitioner in future.
    
    ## How was this patch tested?
    
    Unit tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kunalkhamar/spark num-partitions

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17216.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17216
    
----
commit 12f5fd30229e441355a05290ed124263c1429acc
Author: Kunal Khamar <kk...@outlook.com>
Date:   2017-03-08T21:29:02Z

    Record num shuffle partitions in offset log and enforce in next batch.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74290/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74353 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74353/testReport)** for PR 17216 at commit [`1cacd32`](https://github.com/apache/spark/commit/1cacd32e21f074c2ffd79f1ce4390bd9f113da0c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74758 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74758/testReport)** for PR 17216 at commit [`3abe0a0`](https://github.com/apache/spark/commit/3abe0a0fcb60def5eb14697fa6f78eee318b77b0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106058976
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -437,25 +464,28 @@ class StreamExecution(
           }
         }
         if (hasNewData) {
    -      // Current batch timestamp in milliseconds
    -      offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
    +      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
           // Update the eventTime watermark if we find one in the plan.
           if (lastExecution != null) {
             lastExecution.executedPlan.collect {
               case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
                 logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
                 e.eventTimeStats.value.max - e.delayMs
             }.headOption.foreach { newWatermarkMs =>
    -          if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
    +          if (newWatermarkMs > batchWatermarkMs) {
                 logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    -            offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
    +            batchWatermarkMs = newWatermarkMs
               } else {
                 logDebug(
                   s"Event time didn't move: $newWatermarkMs < " +
    -                s"${offsetSeqMetadata.batchWatermarkMs}")
    +                s"$batchWatermarkMs")
               }
             }
           }
    +      offsetSeqMetadata = OffsetSeqMetadata(
    --- End diff --
    
    You can make this `offsetSeqMetadata.copy(batchWatermarkMs= batchWatermarkMs, batchTimestampMs = triggerClock.getTimeMillis()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106059496
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    +    withTempDir(dir => {
    --- End diff --
    
    nit: `withTempDir { dir => `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105310100
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---
    @@ -71,7 +71,10 @@ object OffsetSeq {
      * @param batchTimestampMs: The current batch processing timestamp.
      * Time unit: milliseconds
      */
    -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
    +case class OffsetSeqMetadata(
    +    var batchWatermarkMs: Long = 0,
    +    var batchTimestampMs: Long = 0,
    +    var numShufflePartitions: Int = 0) {
    --- End diff --
    
    @zsxwing Changed to a map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    LGTM. Merging it to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105781560
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri).getCanonicalPath
    +    val query = inputData
    +      .toDF()
    +      .groupBy($"value")
    +      .agg(count("*"))
    +      .writeStream
    +      .queryName("counts")
    +      .outputMode("complete")
    +      .option("checkpointLocation", checkpointDir)
    +      .format("memory")
    +
    +    // Checkpoint data was generated by a query with 10 shuffle partitions.
    +    // Test if recovery from checkpoint is successful.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +      query.start().processAllAvailable()
    --- End diff --
    
    its not clear that this would actually re-execute a batch. unless a batch is executed, this does not test anything. so how about you add more data after processAllAvailable(), to ensure that at least one batch is actually executed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106757213
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -256,6 +259,15 @@ class StreamExecution(
           updateStatusMessage("Initializing sources")
           // force initialization of the logical plan so that the sources can be created
           logicalPlan
    +
    +      // Isolated spark session to run the batches with.
    +      val sparkSessionToRunBatches = sparkSession.cloneSession()
    +      // Adaptive execution can change num shuffle partitions, disallow
    +      sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
    +      offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0,
    --- End diff --
    
    nit: remove line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105779702
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---
    @@ -70,13 +69,16 @@ object OffsetSeq {
      * bound the lateness of data that will processed. Time unit: milliseconds
      * @param batchTimestampMs: The current batch processing timestamp.
      * Time unit: milliseconds
    + * @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions.
      */
    -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
    +case class OffsetSeqMetadata(
    +    var batchWatermarkMs: Long = 0,
    --- End diff --
    
    Do you know why we have this as var?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74564/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by uncleGen <gi...@git.apache.org>.
Github user uncleGen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105069281
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -380,7 +382,20 @@ class StreamExecution(
             logInfo(s"Resuming streaming query, starting with batch $batchId")
             currentBatchId = batchId
             availableOffsets = nextOffsets.toStreamProgress(sources)
    -        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
    +        val numShufflePartitionsFromConf = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
    +        offsetSeqMetadata = nextOffsets
    +          .metadata
    +          .getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf))
    +
    +        /*
    +         * For backwards compatibility, if # partitions was not recorded in the offset log, then
    +         * ensure it is non-zero. The new value is picked up from the conf.
    +         */
    --- End diff --
    
    for inline comment with the code, use // and not /* .. */.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106059434
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    +    withTempDir(dir => {
    +      // Copy the checkpoint to a temp dir to prevent changes to the original.
    +      // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
    +      FileUtils.copyDirectory(checkpointDir, dir)
    +
    +      // Checkpoint data was generated by a query with 10 shuffle partitions.
    +      // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
    +      // since the last batch may be rerun.
    +      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +        var streamingQuery: StreamingQuery = null
    +        try {
    +          streamingQuery =
    +            query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start()
    +          streamingQuery.processAllAvailable()
    +          inputData.addData(9)
    +          streamingQuery.processAllAvailable()
    +
    +          QueryTest.checkAnswer(spark.table("counts").toDF(),
    +            Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +            Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
    +        } finally {
    +          if (streamingQuery ne null) {
    +            streamingQuery.stop()
    +          }
    +        }
    +      }
    +    })
    +
    +    // 2 - Check recovery with wrong num shuffle partitions
    +    init()
    +    withTempDir(dir => {
    +      FileUtils.copyDirectory(checkpointDir, dir)
    +
    +      // Since the number of partitions is greater than 10, should throw exception.
    +      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
    +        var streamingQuery: StreamingQuery = null
    +        try {
    +          intercept[StreamingQueryException] {
    --- End diff --
    
    what is the error message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by uncleGen <gi...@git.apache.org>.
Github user uncleGen commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Does this PR mix in some test file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105781762
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    --- End diff --
    
    can you add a comment saying that start the query with existing checkpoints generated by 2.1 which do not have shuffle partitions recorded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106266038
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    +    withTempDir(dir => {
    +      // Copy the checkpoint to a temp dir to prevent changes to the original.
    +      // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
    +      FileUtils.copyDirectory(checkpointDir, dir)
    +
    +      // Checkpoint data was generated by a query with 10 shuffle partitions.
    +      // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
    +      // since the last batch may be rerun.
    +      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +        var streamingQuery: StreamingQuery = null
    +        try {
    +          streamingQuery =
    +            query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start()
    +          streamingQuery.processAllAvailable()
    +          inputData.addData(9)
    +          streamingQuery.processAllAvailable()
    +
    +          QueryTest.checkAnswer(spark.table("counts").toDF(),
    +            Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +            Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
    +        } finally {
    +          if (streamingQuery ne null) {
    +            streamingQuery.stop()
    +          }
    +        }
    +      }
    +    })
    +
    +    // 2 - Check recovery with wrong num shuffle partitions
    +    init()
    +    withTempDir(dir => {
    +      FileUtils.copyDirectory(checkpointDir, dir)
    +
    +      // Since the number of partitions is greater than 10, should throw exception.
    +      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
    +        var streamingQuery: StreamingQuery = null
    +        try {
    +          intercept[StreamingQueryException] {
    --- End diff --
    
    https://github.com/apache/spark/pull/17216#discussion_r106033678


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74760 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74760/testReport)** for PR 17216 at commit [`a0c71af`](https://github.com/apache/spark/commit/a0c71af4ad2dd99815f85e3b0c842419beeea67c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74754 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74754/testReport)** for PR 17216 at commit [`3ae4414`](https://github.com/apache/spark/commit/3ae44148913dfab274dc527f7c044070a39f1f61).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106043061
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri).getCanonicalPath
    +    val query = inputData
    +      .toDF()
    +      .groupBy($"value")
    +      .agg(count("*"))
    +      .writeStream
    +      .queryName("counts")
    +      .outputMode("complete")
    +      .option("checkpointLocation", checkpointDir)
    +      .format("memory")
    +
    +    // Checkpoint data was generated by a query with 10 shuffle partitions.
    +    // Test if recovery from checkpoint is successful.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +      query.start().processAllAvailable()
    --- End diff --
    
    Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106058765
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -380,7 +387,27 @@ class StreamExecution(
             logInfo(s"Resuming streaming query, starting with batch $batchId")
             currentBatchId = batchId
             availableOffsets = nextOffsets.toStreamProgress(sources)
    -        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
    +
    +        // initialize metadata
    +        val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
    +        offsetSeqMetadata = {
    +          if (nextOffsets.metadata.isEmpty) {
    +            OffsetSeqMetadata(0, 0,
    +              Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString))
    +          } else {
    +            val metadata = nextOffsets.metadata.get
    +            val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, {
    +              // For backward compatibility, if # partitions was not recorded in the offset log,
    +              // then ensure it is not missing. The new value is picked up from the conf.
    +              logDebug("Number of shuffle partitions from previous run not found in checkpoint. "
    --- End diff --
    
    Make this a log warning. So that we can debug. And it should be printed only once, at the time of upgrading for the first time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106269722
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    +    withTempDir(dir => {
    --- End diff --
    
    Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    @uncleGen Not sure what that means, could you please elaborate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106709281
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -549,9 +581,15 @@ class StreamExecution(
               cd.dataType, cd.timeZoneId)
         }
     
    +    // Reset confs to disallow change in number of partitions
    --- End diff --
    
    Why need to set the confs for every batch? You can set it after recovering `offsetSeqMetadata`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105050897
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---
    @@ -71,7 +71,10 @@ object OffsetSeq {
      * @param batchTimestampMs: The current batch processing timestamp.
      * Time unit: milliseconds
      */
    -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
    +case class OffsetSeqMetadata(
    +    var batchWatermarkMs: Long = 0,
    +    var batchTimestampMs: Long = 0,
    +    var numShufflePartitions: Int = 0) {
    --- End diff --
    
    It's better to use `conf: Map[String, String]` here because we probably will add more confs to this class in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74760/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    @zsxwing Will change cloning of listener manager in a new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74564 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74564/testReport)** for PR 17216 at commit [`5c851a5`](https://github.com/apache/spark/commit/5c851a5ce094867873392a4e5071a8efe47e2801).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by lw-lin <gi...@git.apache.org>.
Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105081023
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---
    @@ -71,7 +71,10 @@ object OffsetSeq {
      * @param batchTimestampMs: The current batch processing timestamp.
      * Time unit: milliseconds
      */
    -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
    +case class OffsetSeqMetadata(
    +    var batchWatermarkMs: Long = 0,
    +    var batchTimestampMs: Long = 0,
    +    var numShufflePartitions: Int = 0) {
    --- End diff --
    
    Hi @kunalkhamar, in case you would update `OffsetSeq`'s log version number, the work being done in #17070 might be helpful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    LGTM. a few comments in tests. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74353/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74352/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r126029620
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def prepareMemoryStream(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    prepareMemoryStream()
    +    withTempDir { dir =>
    +      // Copy the checkpoint to a temp dir to prevent changes to the original.
    +      // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
    +      FileUtils.copyDirectory(checkpointDir, dir)
    +
    +      // Checkpoint data was generated by a query with 10 shuffle partitions.
    +      // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
    +      // since the last batch may be rerun.
    --- End diff --
    
    https://github.com/apache/spark/pull/18503#discussion_r126028838


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106267528
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -437,25 +464,28 @@ class StreamExecution(
           }
         }
         if (hasNewData) {
    -      // Current batch timestamp in milliseconds
    -      offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
    +      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
           // Update the eventTime watermark if we find one in the plan.
           if (lastExecution != null) {
             lastExecution.executedPlan.collect {
               case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
                 logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
                 e.eventTimeStats.value.max - e.delayMs
             }.headOption.foreach { newWatermarkMs =>
    -          if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
    +          if (newWatermarkMs > batchWatermarkMs) {
                 logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    -            offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
    +            batchWatermarkMs = newWatermarkMs
               } else {
                 logDebug(
                   s"Event time didn't move: $newWatermarkMs < " +
    -                s"${offsetSeqMetadata.batchWatermarkMs}")
    +                s"$batchWatermarkMs")
               }
             }
           }
    +      offsetSeqMetadata = OffsetSeqMetadata(
    --- End diff --
    
    Good point, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74758/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74290 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74290/testReport)** for PR 17216 at commit [`60ec7da`](https://github.com/apache/spark/commit/60ec7da08ea9b55d6f307c894e0dbd0a116d9413).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74757/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    @zsxwing @uncleGen @lw-lin 
    This is ready for another review, can you please take a look when you get a chance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106042959
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri).getCanonicalPath
    +    val query = inputData
    +      .toDF()
    +      .groupBy($"value")
    +      .agg(count("*"))
    +      .writeStream
    +      .queryName("counts")
    +      .outputMode("complete")
    +      .option("checkpointLocation", checkpointDir)
    +      .format("memory")
    +
    +    // Checkpoint data was generated by a query with 10 shuffle partitions.
    +    // Test if recovery from checkpoint is successful.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +      query.start().processAllAvailable()
    +
    +      QueryTest.checkAnswer(spark.table("counts").toDF(),
    +        Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +        Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
    +    }
    --- End diff --
    
    Added `try .. finally`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS][WIP] Record num shuffle partitions in ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74224 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74224/testReport)** for PR 17216 at commit [`12f5fd3`](https://github.com/apache/spark/commit/12f5fd30229e441355a05290ed124263c1429acc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74353 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74353/testReport)** for PR 17216 at commit [`1cacd32`](https://github.com/apache/spark/commit/1cacd32e21f074c2ffd79f1ce4390bd9f113da0c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74754 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74754/testReport)** for PR 17216 at commit [`3ae4414`](https://github.com/apache/spark/commit/3ae44148913dfab274dc527f7c044070a39f1f61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74561 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74561/testReport)** for PR 17216 at commit [`030e635`](https://github.com/apache/spark/commit/030e6353abbf084bc62eea64caf494c08ebb294e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74757 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74757/testReport)** for PR 17216 at commit [`a2b32ce`](https://github.com/apache/spark/commit/a2b32ce3ee536d1ea1d12ae4dcc3c561788e3dd2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74758 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74758/testReport)** for PR 17216 at commit [`3abe0a0`](https://github.com/apache/spark/commit/3abe0a0fcb60def5eb14697fa6f78eee318b77b0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74344 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74344/testReport)** for PR 17216 at commit [`f6bd071`](https://github.com/apache/spark/commit/f6bd071ca794ed4532eef936b69a54f5d2d68cf1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106724958
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala ---
    @@ -29,12 +30,32 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
       case class StringOffset(override val json: String) extends Offset
     
       test("OffsetSeqMetadata - deserialization") {
    -    assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
    -    assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
    -    assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
    -    assert(
    -      OffsetSeqMetadata(1, 2) ===
    -        OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
    +    val key = SQLConf.SHUFFLE_PARTITIONS.key
    +
    +    def getConfWith(shufflePartitions: Int): Map[String, String] = {
    +      Map(key -> shufflePartitions.toString)
    +    }
    +
    +    // None set
    +    assert(OffsetSeqMetadata(0, 0, Map.empty) === OffsetSeqMetadata("""{}"""))
    +
    +    // One set
    +    assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
    +    assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
    +    assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
    +      OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
    +
    +    // Two set
    +    assert(OffsetSeqMetadata(1, 2, Map.empty) ===
    +      OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
    +    assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
    +      OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
    +    assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
    +      OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
    +
    +    // All set
    +    assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
    +      OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}"""))
    --- End diff --
    
    Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105781913
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri).getCanonicalPath
    +    val query = inputData
    +      .toDF()
    +      .groupBy($"value")
    +      .agg(count("*"))
    +      .writeStream
    +      .queryName("counts")
    +      .outputMode("complete")
    +      .option("checkpointLocation", checkpointDir)
    +      .format("memory")
    +
    +    // Checkpoint data was generated by a query with 10 shuffle partitions.
    +    // Test if recovery from checkpoint is successful.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +      query.start().processAllAvailable()
    +
    +      QueryTest.checkAnswer(spark.table("counts").toDF(),
    +        Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +        Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
    +    }
    --- End diff --
    
    you dont seem to stop the query? would be good put a `try .. finally` within the `withSQLConf` to stop the query. otherwise can lead to cascaded failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74561 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74561/testReport)** for PR 17216 at commit [`030e635`](https://github.com/apache/spark/commit/030e6353abbf084bc62eea64caf494c08ebb294e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74226/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Seems like an unrelated failure. Probably a flaky test.
    
    On Mar 17, 2017 4:23 PM, "UCB AMPLab" <no...@github.com> wrote:
    
    > Merged build finished. Test FAILed.
    >
    > \u2014
    > You are receiving this because you commented.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/17216#issuecomment-287494785>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/AAoerHj4EZoc91N-MBjkjytKJ7VqxOccks5rmxXqgaJpZM4MXaiH>
    > .
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    LGTM. Just a few nits. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105781135
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri).getCanonicalPath
    +    val query = inputData
    +      .toDF()
    +      .groupBy($"value")
    +      .agg(count("*"))
    +      .writeStream
    +      .queryName("counts")
    +      .outputMode("complete")
    +      .option("checkpointLocation", checkpointDir)
    +      .format("memory")
    +
    +    // Checkpoint data was generated by a query with 10 shuffle partitions.
    +    // Test if recovery from checkpoint is successful.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +      query.start().processAllAvailable()
    +
    +      QueryTest.checkAnswer(spark.table("counts").toDF(),
    +        Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +        Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
    +    }
    +
    +    // If the number of partitions is greater, should throw exception.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
    --- End diff --
    
    can you check whether the returned message is useful?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74617/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74617 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74617/testReport)** for PR 17216 at commit [`dfae7be`](https://github.com/apache/spark/commit/dfae7beeedc074a2806fd2b1cd9d4652006d9e89).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74344/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105792431
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---
    @@ -70,13 +69,16 @@ object OffsetSeq {
      * bound the lateness of data that will processed. Time unit: milliseconds
      * @param batchTimestampMs: The current batch processing timestamp.
      * Time unit: milliseconds
    + * @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions.
      */
    -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
    +case class OffsetSeqMetadata(
    +    var batchWatermarkMs: Long = 0,
    --- End diff --
    
    Changed to vals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74617 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74617/testReport)** for PR 17216 at commit [`dfae7be`](https://github.com/apache/spark/commit/dfae7beeedc074a2806fd2b1cd9d4652006d9e89).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74760 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74760/testReport)** for PR 17216 at commit [`a0c71af`](https://github.com/apache/spark/commit/a0c71af4ad2dd99815f85e3b0c842419beeea67c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106269742
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    +    withTempDir(dir => {
    +      // Copy the checkpoint to a temp dir to prevent changes to the original.
    +      // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
    +      FileUtils.copyDirectory(checkpointDir, dir)
    +
    +      // Checkpoint data was generated by a query with 10 shuffle partitions.
    +      // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
    +      // since the last batch may be rerun.
    +      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +        var streamingQuery: StreamingQuery = null
    +        try {
    +          streamingQuery =
    +            query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start()
    +          streamingQuery.processAllAvailable()
    +          inputData.addData(9)
    +          streamingQuery.processAllAvailable()
    +
    +          QueryTest.checkAnswer(spark.table("counts").toDF(),
    +            Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +            Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
    +        } finally {
    +          if (streamingQuery ne null) {
    +            streamingQuery.stop()
    +          }
    +        }
    +      }
    +    })
    +
    +    // 2 - Check recovery with wrong num shuffle partitions
    +    init()
    +    withTempDir(dir => {
    --- End diff --
    
    Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74561/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106709791
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala ---
    @@ -29,12 +30,32 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
       case class StringOffset(override val json: String) extends Offset
     
       test("OffsetSeqMetadata - deserialization") {
    -    assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
    -    assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
    -    assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
    -    assert(
    -      OffsetSeqMetadata(1, 2) ===
    -        OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
    +    val key = SQLConf.SHUFFLE_PARTITIONS.key
    +
    +    def getConfWith(shufflePartitions: Int): Map[String, String] = {
    +      Map(key -> shufflePartitions.toString)
    +    }
    +
    +    // None set
    +    assert(OffsetSeqMetadata(0, 0, Map.empty) === OffsetSeqMetadata("""{}"""))
    +
    +    // One set
    +    assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
    +    assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
    +    assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
    +      OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
    +
    +    // Two set
    +    assert(OffsetSeqMetadata(1, 2, Map.empty) ===
    +      OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
    +    assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
    +      OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
    +    assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
    +      OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
    +
    +    // All set
    +    assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
    +      OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}"""))
    --- End diff --
    
    nit: could you add a test to verify that unknown fields don't break the serialization? Such as
    ```Scala
        assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
          OffsetSeqMetadata(
            s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}},"unknown":1"""))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74708/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74352 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74352/testReport)** for PR 17216 at commit [`3af1cb4`](https://github.com/apache/spark/commit/3af1cb4ab978e570da86075264358cc524650bf6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74564 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74564/testReport)** for PR 17216 at commit [`5c851a5`](https://github.com/apache/spark/commit/5c851a5ce094867873392a4e5071a8efe47e2801).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106059096
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    --- End diff --
    
    nit: init -> prepareMemoryStream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    LGTM. Will merge after tests pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106266808
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -380,7 +387,27 @@ class StreamExecution(
             logInfo(s"Resuming streaming query, starting with batch $batchId")
             currentBatchId = batchId
             availableOffsets = nextOffsets.toStreamProgress(sources)
    -        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
    +
    +        // initialize metadata
    +        val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
    +        offsetSeqMetadata = {
    +          if (nextOffsets.metadata.isEmpty) {
    +            OffsetSeqMetadata(0, 0,
    --- End diff --
    
    Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74754/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74708 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74708/testReport)** for PR 17216 at commit [`4733b4e`](https://github.com/apache/spark/commit/4733b4e160bff010521319a1aa61e4f7981c65d6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74226 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74226/testReport)** for PR 17216 at commit [`9ff4d29`](https://github.com/apache/spark/commit/9ff4d2956bbcf6ca65888cc616efac8ffeea733b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74352 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74352/testReport)** for PR 17216 at commit [`3af1cb4`](https://github.com/apache/spark/commit/3af1cb4ab978e570da86075264358cc524650bf6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74757 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74757/testReport)** for PR 17216 at commit [`a2b32ce`](https://github.com/apache/spark/commit/a2b32ce3ee536d1ea1d12ae4dcc3c561788e3dd2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74290 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74290/testReport)** for PR 17216 at commit [`60ec7da`](https://github.com/apache/spark/commit/60ec7da08ea9b55d6f307c894e0dbd0a116d9413).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106059371
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    --- End diff --
    
    This is not directly tied to SPARK-19873. remove SPARK-19873. just "recover from a Spark v2.1 checkpoint"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105312919
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala ---
    @@ -71,7 +71,10 @@ object OffsetSeq {
      * @param batchTimestampMs: The current batch processing timestamp.
      * Time unit: milliseconds
      */
    -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
    +case class OffsetSeqMetadata(
    +    var batchWatermarkMs: Long = 0,
    +    var batchTimestampMs: Long = 0,
    +    var numShufflePartitions: Int = 0) {
    --- End diff --
    
    @lw-lin Hi Liwei!
    Thanks for letting me know, we will not be updating the log version number since backward and forward compatibility is preserved by this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74224 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74224/testReport)** for PR 17216 at commit [`12f5fd3`](https://github.com/apache/spark/commit/12f5fd30229e441355a05290ed124263c1429acc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class OffsetSeqMetadata(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106759443
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -256,6 +259,15 @@ class StreamExecution(
           updateStatusMessage("Initializing sources")
           // force initialization of the logical plan so that the sources can be created
           logicalPlan
    +
    +      // Isolated spark session to run the batches with.
    +      val sparkSessionToRunBatches = sparkSession.cloneSession()
    +      // Adaptive execution can change num shuffle partitions, disallow
    +      sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
    +      offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0,
    --- End diff --
    
    Yeah, this should be kept. It should use the conf in the cloned session.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106043087
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    --- End diff --
    
    Added more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r105310054
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -380,7 +382,20 @@ class StreamExecution(
             logInfo(s"Resuming streaming query, starting with batch $batchId")
             currentBatchId = batchId
             availableOffsets = nextOffsets.toStreamProgress(sources)
    -        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
    +        val numShufflePartitionsFromConf = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
    +        offsetSeqMetadata = nextOffsets
    +          .metadata
    +          .getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf))
    +
    +        /*
    +         * For backwards compatibility, if # partitions was not recorded in the offset log, then
    +         * ensure it is non-zero. The new value is picked up from the conf.
    +         */
    --- End diff --
    
    Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106033678
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
    +    val inputData = MemoryStream[Int]
    +    inputData.addData(1, 2, 3, 4)
    +    inputData.addData(3, 4, 5, 6)
    +    inputData.addData(5, 6, 7, 8)
    +
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri).getCanonicalPath
    +    val query = inputData
    +      .toDF()
    +      .groupBy($"value")
    +      .agg(count("*"))
    +      .writeStream
    +      .queryName("counts")
    +      .outputMode("complete")
    +      .option("checkpointLocation", checkpointDir)
    +      .format("memory")
    +
    +    // Checkpoint data was generated by a query with 10 shuffle partitions.
    +    // Test if recovery from checkpoint is successful.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +      query.start().processAllAvailable()
    +
    +      QueryTest.checkAnswer(spark.table("counts").toDF(),
    +        Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +        Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
    +    }
    +
    +    // If the number of partitions is greater, should throw exception.
    +    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
    --- End diff --
    
    Seems okay to me. Underlying cause is `FileNotFoundException`. Error message indicates _Error reading delta file /Users/path/to/checkpoint/state/[operator]/[partition]/[batch].delta_
    > [info] - SPARK-19873: backward compatibility - recover with wrong num shuffle partitions *** FAILED *** (12 seconds, 98 milliseconds)
    [info]   org.apache.spark.sql.streaming.StreamingQueryException: Query badQuery [id = dddc5e7f-1e71-454c-8362-de184444fb5a, runId = b2960c74-257a-4eb1-b242-61d13e20655f] terminated with exception: Job aborted due to stage failure: Task 10 in stage 1.0 failed 1 times, most recent failure: Lost task 10.0 in stage 1.0 (TID 11, localhost, executor driver): java.lang.IllegalStateException: Error reading delta file /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta of HDFSStateStoreProvider[id = (op=0, part=10), dir = /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10]: /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta does not exist
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:384)
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:336)
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:333)
    [info] 	at scala.Option.getOrElse(Option.scala:121)
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:333)
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
    [info] 	at scala.Option.getOrElse(Option.scala:121)
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:332)
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:239)
    [info] 	at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:191)
    [info] 	at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
    [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    [info] 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    [info] 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    [info] 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    [info] 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    [info] 	at org.apache.spark.scheduler.Task.run(Task.scala:108)
    [info] 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:317)
    [info] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    [info] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    [info] 	at java.lang.Thread.run(Thread.java:745)
    [info] Caused by: java.io.FileNotFoundException: File /Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta does not exist
    [info] 	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
    [info] 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
    [info] 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
    [info] 	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
    [info] 	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
    [info] 	at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
    [info] 	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:61)
    [info] 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
    [info] 	at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:381)
    [info] 	... 24 more



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74224/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106285230
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -380,7 +387,27 @@ class StreamExecution(
             logInfo(s"Resuming streaming query, starting with batch $batchId")
             currentBatchId = batchId
             availableOffsets = nextOffsets.toStreamProgress(sources)
    -        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
    +
    +        // initialize metadata
    +        val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
    +        offsetSeqMetadata = {
    +          if (nextOffsets.metadata.isEmpty) {
    +            OffsetSeqMetadata(0, 0,
    +              Map(SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsSparkSession.toString))
    +          } else {
    +            val metadata = nextOffsets.metadata.get
    +            val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, {
    +              // For backward compatibility, if # partitions was not recorded in the offset log,
    +              // then ensure it is not missing. The new value is picked up from the conf.
    +              logDebug("Number of shuffle partitions from previous run not found in checkpoint. "
    --- End diff --
    
    Changed to log warning.
    Rechecked the semantics, it works as expected and warning only printed at time of first upgrade.
    Once we restart query from a v2.1 checkpoint and then stop it, any new offsets written out will contain num shuffle partitions. Any future restarts will read these new offsets in `StreamExecution.populateStartOffsets->offsetLog.getLatest` and pick up the recorded num shuffle partitions.
    Useful to note for future reference that we do not change the old offset files to contain num shuffle partitions, the semantics are correct because of call to `offsetLog.getLatest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106724948
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -549,9 +581,15 @@ class StreamExecution(
               cd.dataType, cd.timeZoneId)
         }
     
    +    // Reset confs to disallow change in number of partitions
    --- End diff --
    
    Good point, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17216


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106059467
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    +    withTempDir(dir => {
    +      // Copy the checkpoint to a temp dir to prevent changes to the original.
    +      // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
    +      FileUtils.copyDirectory(checkpointDir, dir)
    +
    +      // Checkpoint data was generated by a query with 10 shuffle partitions.
    +      // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
    +      // since the last batch may be rerun.
    +      withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
    +        var streamingQuery: StreamingQuery = null
    +        try {
    +          streamingQuery =
    +            query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start()
    +          streamingQuery.processAllAvailable()
    +          inputData.addData(9)
    +          streamingQuery.processAllAvailable()
    +
    +          QueryTest.checkAnswer(spark.table("counts").toDF(),
    +            Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
    +            Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
    +        } finally {
    +          if (streamingQuery ne null) {
    +            streamingQuery.stop()
    +          }
    +        }
    +      }
    +    })
    +
    +    // 2 - Check recovery with wrong num shuffle partitions
    +    init()
    +    withTempDir(dir => {
    --- End diff --
    
    nit: 
    `withTempDir { dir => `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74708 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74708/testReport)** for PR 17216 at commit [`4733b4e`](https://github.com/apache/spark/commit/4733b4e160bff010521319a1aa61e4f7981c65d6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74226 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74226/testReport)** for PR 17216 at commit [`9ff4d29`](https://github.com/apache/spark/commit/9ff4d2956bbcf6ca65888cc616efac8ffeea733b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106269427
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    +    var inputData: MemoryStream[Int] = null
    +    var query: DataStreamWriter[Row] = null
    +
    +    def init(): Unit = {
    +      inputData = MemoryStream[Int]
    +      inputData.addData(1, 2, 3, 4)
    +      inputData.addData(3, 4, 5, 6)
    +      inputData.addData(5, 6, 7, 8)
    +
    +      query = inputData
    +        .toDF()
    +        .groupBy($"value")
    +        .agg(count("*"))
    +        .writeStream
    +        .outputMode("complete")
    +        .format("memory")
    +    }
    +
    +    // Get an existing checkpoint generated by Spark v2.1.
    +    // v2.1 does not record # shuffle partitions in the offset metadata.
    +    val resourceUri =
    +      this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
    +    val checkpointDir = new File(resourceUri)
    +
    +    // 1 - Test if recovery from the checkpoint is successful.
    +    init()
    --- End diff --
    
    Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106057080
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---
    @@ -380,7 +387,27 @@ class StreamExecution(
             logInfo(s"Resuming streaming query, starting with batch $batchId")
             currentBatchId = batchId
             availableOffsets = nextOffsets.toStreamProgress(sources)
    -        offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
    +
    +        // initialize metadata
    +        val shufflePartitionsSparkSession: Int = sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
    +        offsetSeqMetadata = {
    +          if (nextOffsets.metadata.isEmpty) {
    +            OffsetSeqMetadata(0, 0,
    --- End diff --
    
    nit: can you make this call with named params
     ```
    OffsetSeqMetadata(
       batchWatermarkMs = 0,  
       ...
    )
    ``` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

Posted by kunalkhamar <gi...@git.apache.org>.
Github user kunalkhamar commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17216#discussion_r106269036
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
         query.stop()
         assert(query.exception.isEmpty)
       }
    +
    +  test("SPARK-19873: streaming aggregation with change in number of partitions") {
    +    val inputData = MemoryStream[(Int, Int)]
    +    val agg = inputData.toDS().groupBy("_1").count()
    +
    +    testStream(agg, OutputMode.Complete())(
    +      AddData(inputData, (1, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
    +      CheckAnswer((1, 1), (2, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (2, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
    +      CheckAnswer((1, 1), (2, 2), (3, 1)),
    +      StopStream,
    +      AddData(inputData, (3, 0), (1, 0)),
    +      StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
    +      CheckAnswer((1, 2), (2, 2), (3, 2)))
    +  }
    +
    +  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
    --- End diff --
    
    I see, removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17216
  
    **[Test build #74344 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74344/testReport)** for PR 17216 at commit [`f6bd071`](https://github.com/apache/spark/commit/f6bd071ca794ed4532eef936b69a54f5d2d68cf1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org