You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/05/02 22:19:52 UTC

[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/21220

    [SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication.

    ## What changes were proposed in this pull request?
    This PR enables the MicroBatchExecution to run no-data batches if some SparkPlan requires running another batch to output results based on updated watermark / processing time. In this PR, I have enabled streaming aggregations and streaming deduplicates to automatically run addition batch even if new data is available. In future PRs, I will enable streaming join and mapGroupsWithState as well.
    
    Major changes/refactoring done in this PR.
    - Refactoring MicroBatchExecution - A major point of confusion in MicroBatchExecution control flow was always (at least to me) was that `populateStartOffsets` internally called `constructNextBatch` which was not obvious from just the name "populateStartOffsets" and made the control flow from the main trigger execution loop very confusing (main loop in `runActivatedStream` called `constructNextBatch` but only if `populateStartOffsets` hadn't already called it). Instead, the refactoring makes it cleaner.
        - `populateStartOffsets` only the updates `availableOffsets` and `committedOffsets`. Does not call `constructNextBatch`.
        - Main loop in `runActivatedStream` calls `constructNextBatch` which returns true or false reflecting whether the next batch is ready for executing. This method is now idempotent; if a batch has already been constructed, then it will always return true until the batch has been executed.
        - If next batch is ready then we call `runBatch` or sleep. 
        - That's it.
    
    - Refactoring watermark management logic - This has been refactored out from `MicroBatchExecution` in a separate class to simplify `MicroBatchExecution`.
    
    - New method `shouldRunAnotherBatch` in `IncrementalExecution` - This returns true if there is any stateful operation in the last execution plan that requires another batch for state cleanup, etc. This is used to decide whether to construct a batch or not in `constructNextBatch`.
    
    - Changes to stream testing framework - Many tests used CheckLastBatch to validate answers. This assumed that there will be no more batches after the last set of input has been processed, so the last batch is the one that has output corresponding to the last input. This is not true anymore. To account for that, I made two changes.
        - `CheckNewAnswer` is a new test action that verifies the new rows generated since the last time the answer was checked by `CheckAnswer`, `CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches occurred between the last check and now. To do make this easier, I added a common trait between MemorySink and MemorySinkV2 to abstract out some common methods.
        - `assertNumStateRows` has been updated in the same way to be agnostic to batches while checking what the total rows and how many state rows were updated (sums up updates since the last check).
    
    
    
    ## How was this patch tested?
    - Changes made to existing tests - Tests have been changed in one of the following patterns.
        - Tests where the last input was given again to force another batch to be executed and state cleaned up / output generated, they were simplified by removing the extra input.
        - Tests using aggregation+watermark where CheckLastBatch were replaced with CheckNewAnswer to make them batch agnostic.
    - New tests added to check whether the flag works for streaming aggregation and deduplication
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-24157

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21220
    
----
commit 2ccdc60012b06201b1cf48e8216c21525f731e5a
Author: Tathagata Das <ta...@...>
Date:   2018-05-02T22:15:31Z

    Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    @brkyvz Answers to your questions.
    
    1. We have already fixed those emptyDF optimizations. The optimization only quick in `df.isStreaming = false`, and emptyDFs generated by sources should be have isStreaming = true. That's for v1 sources. For v2 sources, the engine already takes care of that by making sure that `StreamingDataSourceV2Relation.isStreaming` is true where `StreamingDataSourceV2Relation` is the logical plan leaf inserted into the micro-batch logical plan irrespective of empty or not.
    
    2. I will do both type of timeouts in flatMapGroupsWithState in later PR.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    **[Test build #90078 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90078/testReport)** for PR 21220 at commit [`7fa11c0`](https://github.com/apache/spark/commit/7fa11c0ac362ace43ce02dee6309a3a632b0c3ee).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2829/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90172/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185962261
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    --- End diff --
    
    I could do that. Though it would be duplicating the line `runBatch(sparkSessionForStream)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185974962
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    --- End diff --
    
    updated.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185891436
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -373,7 +352,7 @@ class MicroBatchExecution(
                     reader.commit(reader.deserializeOffset(off.json))
                 }
               } else {
    -            throw new IllegalStateException(s"batch $currentBatchId doesn't exist")
    +            throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't exist")
    --- End diff --
    
    good catch


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185886437
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    --- End diff --
    
    then you can do something like:
    ```scala
    if (currentBatchIsRunnable && currentBatchHasNewData) {
      updateStatusMessage("Processing new data")
      runBatch(sparkSessionForStream)
    } else if (currentBatchIsRunnable) {
      updateStatusMessage("Processing empty trigger to timeout state") // or whatever
      runBatch(sparkSessionForStream)
    } else {
      updateStatusMessage("Waiting for data to arrive")
    }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2828/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185883611
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    +          if (currentBatchIsRunnable) {
                 updateStatusMessage("Processing new data")
    +            // Remember whether the current batch has data or not. This will be required later
    --- End diff --
    
    the status message above isn't completely truthful if we are running a zero-data batch


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185975133
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    +          if (currentBatchIsRunnable) {
                 updateStatusMessage("Processing new data")
    +            // Remember whether the current batch has data or not. This will be required later
    +            // for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
    +            // to false as the batch would have already processed the available data.
    +            currentBatchHadNewData = isNewDataAvailable
    +
                 runBatch(sparkSessionForStream)
    +          } else {
    +            updateStatusMessage("Waiting for data to arrive")
               }
             }
    -        // Report trigger as finished and construct progress object.
    -        finishTrigger(dataAvailable)
    -        if (dataAvailable) {
    -          // Update committed offsets.
    -          commitLog.add(currentBatchId)
    --- End diff --
    
    @brkyvz this also fixes a different bug-ish thing where we were no reporting the time taken to write to commit log as part of the "triggerExecution" since its outside the `reportTimeTaken("triggerExecution")`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185892675
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -384,22 +363,21 @@ class MicroBatchExecution(
               commitLog.purge(currentBatchId - minLogEntriesToMaintain)
             }
           }
    +      noNewData = false
         } else {
    -      awaitProgressLock.lock()
    -      try {
    -        // Wake up any threads that are waiting for the stream to progress.
    -        awaitProgressLockCondition.signalAll()
    -      } finally {
    -        awaitProgressLock.unlock()
    -      }
    +      noNewData = true
    +      awaitProgressLockCondition.signalAll()
         }
    +    shouldConstructNextBatch
       }
     
       /**
        * Processes any data available between `availableOffsets` and `committedOffsets`.
        * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
        */
       private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
    +    logDebug(s"Running batch $currentBatchId")
    +
    --- End diff --
    
    I guess we're going to see if all sources follow the contract of returning an empty dataframe if the start and end offsets are the same


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185678739
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    --- End diff --
    
    this paragraph is highly confusing. Could you please reword? Maybe something like:
    ```
    Attempts to construct a batch according to:
     - Availability of new data
     - Existence of timeouts in stateful operators
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185893229
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.execution.SparkPlan
    +
    +class WatermarkTracker extends Logging {
    +  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
    +  private var watermarkMs: Long = 0
    +  private var updated = false
    +
    +  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
    +    watermarkMs = newWatermarkMs
    +  }
    +
    +  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
    +    val watermarkOperators = executedPlan.collect {
    --- End diff --
    
    I would comment on the contracts. We expect a certain ordering of stateful operators across triggers. therefore we turn off cbo, etc


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185961121
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    +   * metadata is such that another batch needs to be run for state clean up / additional output
    +   * generation even without new data. Returns true only if the next batch should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can refactor this away.
    -              // For now, we set the range here to get the source to infer the available end offset,
    -              // get that offset, and then set the range again when we later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already been called before
    +    // and it must have already committed the offset range of next batch to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    +
    +    // Generate a map from each unique source to the next available offset.
    +    val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    +      case s: Source =>
    +        updateStatusMessage(s"Getting offsets from $s")
    +        reportTimeTaken("getOffset") {
    +          (s, s.getOffset)
             }
    -      } finally {
    -        awaitProgressLock.unlock()
    -      }
    -    }
    -    if (hasNewData) {
    -      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
    -      // Update the eventTime watermarks if we find any in the plan.
    -      if (lastExecution != null) {
    -        lastExecution.executedPlan.collect {
    -          case e: EventTimeWatermarkExec => e
    -        }.zipWithIndex.foreach {
    -          case (e, index) if e.eventTimeStats.value.count > 0 =>
    -            logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
    -            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
    -            val prevWatermarkMs = watermarkMsMap.get(index)
    -            if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
    -              watermarkMsMap.put(index, newWatermarkMs)
    -            }
    -
    -          // Populate 0 if we haven't seen any data yet for this watermark node.
    -          case (_, index) =>
    -            if (!watermarkMsMap.isDefinedAt(index)) {
    -              watermarkMsMap.put(index, 0)
    -            }
    +      case s: MicroBatchReader =>
    +        updateStatusMessage(s"Getting offsets from $s")
    +        reportTimeTaken("setOffsetRange") {
    +          // Once v1 streaming source execution is gone, we can refactor this away.
    +          // For now, we set the range here to get the source to infer the available end offset,
    +          // get that offset, and then set the range again when we later execute.
    +          s.setOffsetRange(
    +            toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    +            Optional.empty())
             }
     
    -        // Update the global watermark to the minimum of all watermark nodes.
    -        // This is the safest option, because only the global watermark is fault-tolerant. Making
    -        // it the minimum of all individual watermarks guarantees it will never advance past where
    -        // any individual watermark operator would be if it were in a plan by itself.
    -        if(!watermarkMsMap.isEmpty) {
    -          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
    -          if (newWatermarkMs > batchWatermarkMs) {
    -            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    -            batchWatermarkMs = newWatermarkMs
    -          } else {
    -            logDebug(
    -              s"Event time didn't move: $newWatermarkMs < " +
    -                s"$batchWatermarkMs")
    -          }
    -        }
    -      }
    -      offsetSeqMetadata = offsetSeqMetadata.copy(
    -        batchWatermarkMs = batchWatermarkMs,
    -        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
    +        val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    +        (s, Option(currentOffset))
    +    }.toMap
    +    availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    +
    +    // Update the query metadata
    +    offsetSeqMetadata = offsetSeqMetadata.copy(
    +      batchWatermarkMs = watermarkTracker.currentWatermark,
    +      batchTimestampMs = triggerClock.getTimeMillis())
    +
    +    // Check whether next batch should be constructed
    +    val lastExecutionRequiresAnotherBatch =
    +      sparkSession.sessionState.conf.streamingNoDataMicroBatchesEnabled &&
    --- End diff --
    
    Good catch. We probably should. We dont want the value to change due to changes in the user-facing non-cloned session.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185973062
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    +          if (currentBatchIsRunnable) {
                 updateStatusMessage("Processing new data")
    +            // Remember whether the current batch has data or not. This will be required later
    --- End diff --
    
    Fixed it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2891/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185962542
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    +   * metadata is such that another batch needs to be run for state clean up / additional output
    +   * generation even without new data. Returns true only if the next batch should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can refactor this away.
    -              // For now, we set the range here to get the source to infer the available end offset,
    -              // get that offset, and then set the range again when we later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already been called before
    +    // and it must have already committed the offset range of next batch to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    --- End diff --
    
    Regarding the second question, which method are you talking about? `isNewDataAvailable`? That's being called.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185974949
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    --- End diff --
    
    updated.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185887294
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    +   * metadata is such that another batch needs to be run for state clean up / additional output
    +   * generation even without new data. Returns true only if the next batch should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can refactor this away.
    -              // For now, we set the range here to get the source to infer the available end offset,
    -              // get that offset, and then set the range again when we later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already been called before
    +    // and it must have already committed the offset range of next batch to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    --- End diff --
    
    I don't see anyone else calling this method


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r186016680
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---
    @@ -279,13 +279,10 @@ class FileStreamSinkSuite extends StreamTest {
           check() // nothing emitted yet
     
           addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this
    -      check() // nothing emitted yet
    +      check((100L, 105L) -> 2L)  // no-data-batch emits results on 100-105,
    --- End diff --
    
    I think most of such feature flags in spark are designed to be flagged off not in code but at runtime.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185887201
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    +   * metadata is such that another batch needs to be run for state clean up / additional output
    +   * generation even without new data. Returns true only if the next batch should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can refactor this away.
    -              // For now, we set the range here to get the source to infer the available end offset,
    -              // get that offset, and then set the range again when we later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already been called before
    +    // and it must have already committed the offset range of next batch to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    --- End diff --
    
    how is this possible?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    @brkyvz Answers to your questions.
    
    1. We have already fixed those emptyDF optimizations. The optimization only quick in `df.isStreaming = false`, and emptyDFs generated by sources should be have isStreaming = true. That's for v1 sources. For v2 sources, the engine already takes care of that by making sure that `StreamingDataSourceV2Relation.isStreaming` is true where `StreamingDataSourceV2Relation` is the logical plan leaf inserted into the micro-batch logical plan irrespective of empty or not.
    
    2. I will do both type of timeouts in flatMapGroupsWithState in later PR.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185972962
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    --- End diff --
    
    correct. fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    **[Test build #90080 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90080/testReport)** for PR 21220 at commit [`7fa11c0`](https://github.com/apache/spark/commit/7fa11c0ac362ace43ce02dee6309a3a632b0c3ee).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    **[Test build #90172 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90172/testReport)** for PR 21220 at commit [`da3fd2f`](https://github.com/apache/spark/commit/da3fd2f8510482e3e71cc37a9da2207e3aef1ef0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    **[Test build #90080 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90080/testReport)** for PR 21220 at commit [`7fa11c0`](https://github.com/apache/spark/commit/7fa11c0ac362ace43ce02dee6309a3a632b0c3ee).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class WatermarkTracker extends Logging `
      * `trait MemorySinkBase extends BaseStreamingSink `
      * `class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink`
      * `class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkBase with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90078/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2830/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185885198
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    --- End diff --
    
    why not set `currentBatchHadNewData` here? It's not immediately clear to me if `isNewDataAvailable` can update between here and 3 lines below. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185893837
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.execution.SparkPlan
    +
    +class WatermarkTracker extends Logging {
    +  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
    +  private var watermarkMs: Long = 0
    +  private var updated = false
    +
    +  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
    +    watermarkMs = newWatermarkMs
    +  }
    +
    +  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
    +    val watermarkOperators = executedPlan.collect {
    +      case e: EventTimeWatermarkExec => e
    +    }
    +    if (watermarkOperators.isEmpty) return
    +
    +
    +    watermarkOperators.zipWithIndex.foreach {
    +      case (e, index) if e.eventTimeStats.value.count > 0 =>
    +        logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
    +        val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
    +        val prevWatermarkMs = operatorToWatermarkMap.get(index)
    +        if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
    +          operatorToWatermarkMap.put(index, newWatermarkMs)
    +        }
    +
    +      // Populate 0 if we haven't seen any data yet for this watermark node.
    +      case (_, index) =>
    +        if (!operatorToWatermarkMap.isDefinedAt(index)) {
    +          operatorToWatermarkMap.put(index, 0)
    +        }
    +    }
    +
    +    // Update the global watermark to the minimum of all watermark nodes.
    +    // This is the safest option, because only the global watermark is fault-tolerant. Making
    +    // it the minimum of all individual watermarks guarantees it will never advance past where
    +    // any individual watermark operator would be if it were in a plan by itself.
    +    val newWatermarkMs = operatorToWatermarkMap.minBy(_._2)._2
    +    if (newWatermarkMs > watermarkMs) {
    +      logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    +      watermarkMs = newWatermarkMs
    +      updated = true
    +    } else {
    +      logDebug(s"Event time didn't move: $newWatermarkMs < $watermarkMs")
    +      updated = false
    +    }
    +  }
    +
    +  def watermarkUpdated: Boolean = synchronized { updated }
    --- End diff --
    
    is this used anywhere?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    LGTM.
    
    Obviously shouldn't block this PR, but MicroBatchExecution is structured in a way that makes it hard to review changes like this. It seems like changing the condition under which new batches are run should have been a much more local change than it ended up having to be.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    **[Test build #90078 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90078/testReport)** for PR 21220 at commit [`7fa11c0`](https://github.com/apache/spark/commit/7fa11c0ac362ace43ce02dee6309a3a632b0c3ee).
     * This patch **fails SparkR unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class WatermarkTracker extends Logging `
      * `trait MemorySinkBase extends BaseStreamingSink `
      * `class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink`
      * `class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkBase with Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90080/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    **[Test build #90172 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90172/testReport)** for PR 21220 at commit [`da3fd2f`](https://github.com/apache/spark/commit/da3fd2f8510482e3e71cc37a9da2207e3aef1ef0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185961340
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    +   * metadata is such that another batch needs to be run for state clean up / additional output
    +   * generation even without new data. Returns true only if the next batch should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can refactor this away.
    -              // For now, we set the range here to get the source to infer the available end offset,
    -              // get that offset, and then set the range again when we later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already been called before
    +    // and it must have already committed the offset range of next batch to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    --- End diff --
    
    This condition is possible when restarting. If it finds that the last batch was planned but not completed, then there is new data is already available and committed to the offset log. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    @brkyvz @zsxwing @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185961801
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    +
    +          // We'll do this initialization only once every start / restart
               if (currentBatchId < 0) {
    -            // We'll do this initialization only once
                 populateStartOffsets(sparkSessionForStream)
    -            sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    -            logDebug(s"Stream running from $committedOffsets to $availableOffsets")
    -          } else {
    -            constructNextBatch()
    +            logInfo(s"Stream started from $committedOffsets")
               }
    -          if (dataAvailable) {
    -            currentStatus = currentStatus.copy(isDataAvailable = true)
    +
    +          sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    +
    +          // Try to construct the next batch. This will return true only if the next batch is
    +          // ready and runnable. Note that the current batch may be runnable even without
    +          // new data to process as `constructNextBatch` may decide to run a batch for
    +          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
    +          // is available or not.
    +          currentBatchIsRunnable = constructNextBatch()
    +
    +          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
    +          if (currentBatchIsRunnable) {
                 updateStatusMessage("Processing new data")
    +            // Remember whether the current batch has data or not. This will be required later
    --- End diff --
    
    I can argue that we are still processing the effect of the new data from the previous batch. This is something we can fix later if this is confusing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185677898
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -128,40 +130,49 @@ class MicroBatchExecution(
        * Repeatedly attempts to run batches as data arrives.
        */
       protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
    -    triggerExecutor.execute(() => {
    -      startTrigger()
     
    +    triggerExecutor.execute(() => {
           if (isActive) {
    +        var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
    +        var currentBatchHadNewData = false // Whether the current batch had new data
    +
             reportTimeTaken("triggerExecution") {
    +          startTrigger()
    --- End diff --
    
    this used to be out of the timing block


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185973467
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.execution.SparkPlan
    +
    +class WatermarkTracker extends Logging {
    +  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
    +  private var watermarkMs: Long = 0
    +  private var updated = false
    +
    +  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
    +    watermarkMs = newWatermarkMs
    +  }
    +
    +  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
    +    val watermarkOperators = executedPlan.collect {
    --- End diff --
    
    I dont think we do anything that depends on ordering of EventTimeWatermarkExec. We choose to take the minimum watermark calculated across multiple of them independent of the order.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Yeah. This refactoring was needed. Now it should be easier to make such changes. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185896682
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---
    @@ -279,13 +279,10 @@ class FileStreamSinkSuite extends StreamTest {
           check() // nothing emitted yet
     
           addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this
    -      check() // nothing emitted yet
    +      check((100L, 105L) -> 2L)  // no-data-batch emits results on 100-105,
    --- End diff --
    
    I would explicitly test with flag on for this. in case we want to turn it off, this test shouldn't fail


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185961485
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -384,22 +363,21 @@ class MicroBatchExecution(
               commitLog.purge(currentBatchId - minLogEntriesToMaintain)
             }
           }
    +      noNewData = false
         } else {
    -      awaitProgressLock.lock()
    -      try {
    -        // Wake up any threads that are waiting for the stream to progress.
    -        awaitProgressLockCondition.signalAll()
    -      } finally {
    -        awaitProgressLock.unlock()
    -      }
    +      noNewData = true
    +      awaitProgressLockCondition.signalAll()
         }
    +    shouldConstructNextBatch
       }
     
       /**
        * Processes any data available between `availableOffsets` and `committedOffsets`.
        * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
        */
       private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
    +    logDebug(s"Running batch $currentBatchId")
    +
    --- End diff --
    
    Well a correct source implementation should obviously do that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185890809
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---
    @@ -266,93 +276,62 @@ class MicroBatchExecution(
       }
     
       /**
    -   * Queries all of the sources to see if any new data is available. When there is new data the
    -   * batchId counter is incremented and a new log entry is written with the newest offsets.
    +   * Attempts to construct the next batch based on whether new data is available and/or updated
    +   * metadata is such that another batch needs to be run for state clean up / additional output
    +   * generation even without new data. Returns true only if the next batch should be executed.
    +   *
    +   * Here is the high-level logic on how this constructs the next batch.
    +   * - Check each source whether new data is available
    +   * - Updated the query's metadata and check using the last execution whether there is any need
    +   *   to run another batch (for state clean up, etc.)
    +   * - If either of the above is true, then construct the next batch by committing to the offset
    +   *   log that range of offsets that the next batch will process.
        */
    -  private def constructNextBatch(): Unit = {
    -    // Check to see what new data is available.
    -    val hasNewData = {
    -      awaitProgressLock.lock()
    -      try {
    -        // Generate a map from each unique source to the next available offset.
    -        val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    -          case s: Source =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("getOffset") {
    -              (s, s.getOffset)
    -            }
    -          case s: MicroBatchReader =>
    -            updateStatusMessage(s"Getting offsets from $s")
    -            reportTimeTaken("setOffsetRange") {
    -              // Once v1 streaming source execution is gone, we can refactor this away.
    -              // For now, we set the range here to get the source to infer the available end offset,
    -              // get that offset, and then set the range again when we later execute.
    -              s.setOffsetRange(
    -                toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    -                Optional.empty())
    -            }
    -
    -            val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    -            (s, Option(currentOffset))
    -        }.toMap
    -        availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    -
    -        if (dataAvailable) {
    -          true
    -        } else {
    -          noNewData = true
    -          false
    +  private def constructNextBatch(): Boolean = withProgressLocked {
    +    // If new data is already available that means this method has already been called before
    +    // and it must have already committed the offset range of next batch to the offset log.
    +    // Hence do nothing, just return true.
    +    if (isNewDataAvailable) return true
    +
    +    // Generate a map from each unique source to the next available offset.
    +    val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
    +      case s: Source =>
    +        updateStatusMessage(s"Getting offsets from $s")
    +        reportTimeTaken("getOffset") {
    +          (s, s.getOffset)
             }
    -      } finally {
    -        awaitProgressLock.unlock()
    -      }
    -    }
    -    if (hasNewData) {
    -      var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
    -      // Update the eventTime watermarks if we find any in the plan.
    -      if (lastExecution != null) {
    -        lastExecution.executedPlan.collect {
    -          case e: EventTimeWatermarkExec => e
    -        }.zipWithIndex.foreach {
    -          case (e, index) if e.eventTimeStats.value.count > 0 =>
    -            logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
    -            val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
    -            val prevWatermarkMs = watermarkMsMap.get(index)
    -            if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
    -              watermarkMsMap.put(index, newWatermarkMs)
    -            }
    -
    -          // Populate 0 if we haven't seen any data yet for this watermark node.
    -          case (_, index) =>
    -            if (!watermarkMsMap.isDefinedAt(index)) {
    -              watermarkMsMap.put(index, 0)
    -            }
    +      case s: MicroBatchReader =>
    +        updateStatusMessage(s"Getting offsets from $s")
    +        reportTimeTaken("setOffsetRange") {
    +          // Once v1 streaming source execution is gone, we can refactor this away.
    +          // For now, we set the range here to get the source to infer the available end offset,
    +          // get that offset, and then set the range again when we later execute.
    +          s.setOffsetRange(
    +            toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
    +            Optional.empty())
             }
     
    -        // Update the global watermark to the minimum of all watermark nodes.
    -        // This is the safest option, because only the global watermark is fault-tolerant. Making
    -        // it the minimum of all individual watermarks guarantees it will never advance past where
    -        // any individual watermark operator would be if it were in a plan by itself.
    -        if(!watermarkMsMap.isEmpty) {
    -          val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
    -          if (newWatermarkMs > batchWatermarkMs) {
    -            logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    -            batchWatermarkMs = newWatermarkMs
    -          } else {
    -            logDebug(
    -              s"Event time didn't move: $newWatermarkMs < " +
    -                s"$batchWatermarkMs")
    -          }
    -        }
    -      }
    -      offsetSeqMetadata = offsetSeqMetadata.copy(
    -        batchWatermarkMs = batchWatermarkMs,
    -        batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
    +        val currentOffset = reportTimeTaken("getEndOffset") { s.getEndOffset() }
    +        (s, Option(currentOffset))
    +    }.toMap
    +    availableOffsets ++= latestOffsets.filter { case (_, o) => o.nonEmpty }.mapValues(_.get)
    +
    +    // Update the query metadata
    +    offsetSeqMetadata = offsetSeqMetadata.copy(
    +      batchWatermarkMs = watermarkTracker.currentWatermark,
    +      batchTimestampMs = triggerClock.getTimeMillis())
    +
    +    // Check whether next batch should be constructed
    +    val lastExecutionRequiresAnotherBatch =
    +      sparkSession.sessionState.conf.streamingNoDataMicroBatchesEnabled &&
    --- End diff --
    
    nit: do we need to use `sparkSessionForStream`? I guess not


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21220: [SPARK-24157][SS] Enabled no-data batches in MicroBatchE...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/21220
  
    jenkins retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21220#discussion_r185969637
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql.execution.SparkPlan
    +
    +class WatermarkTracker extends Logging {
    +  private val operatorToWatermarkMap = mutable.HashMap[Int, Long]()
    +  private var watermarkMs: Long = 0
    +  private var updated = false
    +
    +  def setWatermark(newWatermarkMs: Long): Unit = synchronized {
    +    watermarkMs = newWatermarkMs
    +  }
    +
    +  def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
    +    val watermarkOperators = executedPlan.collect {
    +      case e: EventTimeWatermarkExec => e
    +    }
    +    if (watermarkOperators.isEmpty) return
    +
    +
    +    watermarkOperators.zipWithIndex.foreach {
    +      case (e, index) if e.eventTimeStats.value.count > 0 =>
    +        logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
    +        val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
    +        val prevWatermarkMs = operatorToWatermarkMap.get(index)
    +        if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
    +          operatorToWatermarkMap.put(index, newWatermarkMs)
    +        }
    +
    +      // Populate 0 if we haven't seen any data yet for this watermark node.
    +      case (_, index) =>
    +        if (!operatorToWatermarkMap.isDefinedAt(index)) {
    +          operatorToWatermarkMap.put(index, 0)
    +        }
    +    }
    +
    +    // Update the global watermark to the minimum of all watermark nodes.
    +    // This is the safest option, because only the global watermark is fault-tolerant. Making
    +    // it the minimum of all individual watermarks guarantees it will never advance past where
    +    // any individual watermark operator would be if it were in a plan by itself.
    +    val newWatermarkMs = operatorToWatermarkMap.minBy(_._2)._2
    +    if (newWatermarkMs > watermarkMs) {
    +      logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
    +      watermarkMs = newWatermarkMs
    +      updated = true
    +    } else {
    +      logDebug(s"Event time didn't move: $newWatermarkMs < $watermarkMs")
    +      updated = false
    +    }
    +  }
    +
    +  def watermarkUpdated: Boolean = synchronized { updated }
    --- End diff --
    
    No. I will remove it. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21220: [SPARK-24157][SS] Enabled no-data batches in Micr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21220


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org