You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2017/10/14 00:18:56 UTC

[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/19495

    [SPARK-22278][SS] Expose current event time watermark and current processing time in GroupState

    ## What changes were proposed in this pull request?
    
    Complex state-updating and/or timeout-handling logic in mapGroupsWithState functions may require taking decisions based on the current event-time watermark and/or processing time. Currently, you can use the SQL function `current_timestamp` to get the current processing time, but it needs to be passed inserted in every row with a select, and then passed through the encoder, which isn't efficient. Furthermore, there is no way to get the current watermark.
    
    This PR exposes both of them through the GroupState API. 
    Additionally, it also cleans up some of the GroupState docs. 
    
    ## How was this patch tested?
    
    New unit tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-22278

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19495
    
----
commit c9a042e2f0228584f6a3f643cfac412c73ed98d7
Author: Tathagata Das <ta...@gmail.com>
Date:   2017-10-10T00:01:02Z

    Expose event time watermark in the GorupState

commit 67114ab59f5a8d79fbe66b7deb93869f656346b9
Author: Tathagata Das <ta...@gmail.com>
Date:   2017-10-14T00:16:08Z

    Exposed processing time

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145283763
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala ---
    @@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private(
         timeoutTimestamp = timestampMs
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
       }
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime)
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
       }
     
    +  override def getCurrentWatermarkMs(): Long = {
    +    if (!watermarkPresent) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get event time watermark timestamp without enabling setting watermark before " +
    --- End diff --
    
    yes. agreed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144936653
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         }
       }
     
    +  test("GroupState - getCurrentWatermarkMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get event time watermark timestamp without enabling event time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() }
    +    assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000)
    +    assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
    +    assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
    +  }
    +
    +  test("GroupState - getCurrentProcessingTimeMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get processing time timestamp without enabling processing time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000)
    +    assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() }
    --- End diff --
    
    not actually using `getCurrentWatermarkMs`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144992271
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala ---
    @@ -187,7 +190,7 @@ private[sql] class GroupStateImpl[S] private(
         if (timeoutConf != EventTimeTimeout) {
           throw new UnsupportedOperationException(
             "Cannot set timeout timestamp without enabling event time timeout in " +
    -          "map/flatMapGroupsWithState")
    +          "map|flatMapGroupsWithState")
    --- End diff --
    
    `[map|flatMap]`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145290193
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala ---
    @@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec(
     
       private val isTimeoutEnabled = timeoutConf != NoTimeout
       val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled)
    +  val watermarkPresent = child.output.exists {
    --- End diff --
    
    Correction. No it is not. When watermark is not defined in the query, the eventTimeWatermark value is  Some(0)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82823 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82823/testReport)** for PR 19495 at commit [`2f35889`](https://github.com/apache/spark/commit/2f358893a0dfff08c2175034219623121b55472e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144963002
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala ---
    @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
         timeoutTimestamp = timestampMs
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
       }
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime)
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
       }
     
    +  override def getCurrentWatermarkMs(): Long = {
    +    if (timeoutConf != EventTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get event time watermark timestamp without enabling event time timeout in " +
    +          "[map/flatMap]GroupsWithState")
    --- End diff --
    
    right.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82864/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82820 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82820/testReport)** for PR 19495 at commit [`52deee7`](https://github.com/apache/spark/commit/52deee70a280620cba35fecc7fccf2cd133e6197).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145282515
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala ---
    @@ -119,32 +116,34 @@ private[sql] class GroupStateImpl[S] private(
         timeoutTimestamp = timestampMs
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
       }
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime)
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
       }
     
    +  override def getCurrentWatermarkMs(): Long = {
    +    if (!watermarkPresent) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get event time watermark timestamp without enabling setting watermark before " +
    --- End diff --
    
    `without enabling setting watermark` sounds too convoluted. You probably meant `without setting watermark`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82864 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82864/testReport)** for PR 19495 at commit [`0d788fe`](https://github.com/apache/spark/commit/0d788fe1d9f5cf430ac548bbda3947b3d879505b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144972393
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         }
       }
     
    +  test("GroupState - getCurrentWatermarkMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get event time watermark timestamp without enabling event time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() }
    +    assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000)
    +    assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
    +    assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
    +  }
    +
    +  test("GroupState - getCurrentProcessingTimeMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get processing time timestamp without enabling processing time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000)
    +    assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() }
    --- End diff --
    
    this is testing `getCurrentProcessingTimeMs`, so yeah, thats by design


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82869 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82869/testReport)** for PR 19495 at commit [`ed9d3a2`](https://github.com/apache/spark/commit/ed9d3a2e4fd4b1a0ec0e0ceb34419ede4e2303b8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19495


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145285750
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is less than " +
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note In a streaming query, this can be called only when watermark is set before calling
    +   *       `[map/flatmap]GroupsWithState`. In a batch query, this method always returns -1.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if watermark has not been set before in [map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    +
    +
    +  /**
    +   * Get the current event time watermark.
    --- End diff --
    
    Constant across a trigger. But currently the output is different from current_timestamp as in batch it will return -1. Fixing this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82859 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82859/testReport)** for PR 19495 at commit [`037e036`](https://github.com/apache/spark/commit/037e03658b2102cb1025019b2a22535a5ecb3f50).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144992332
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
    --- End diff --
    
    nit: extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82751/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82820 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82820/testReport)** for PR 19495 at commit [`52deee7`](https://github.com/apache/spark/commit/52deee70a280620cba35fecc7fccf2cd133e6197).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145282877
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,122 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is less than " +
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note In a streaming query, this can be called only when watermark is set before calling
    +   *       `[map/flatmap]GroupsWithState`. In a batch query, this method always returns -1.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if watermark has not been set before in [map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    +
    +
    +  /**
    +   * Get the current event time watermark.
    --- End diff --
    
    `Get the current processing time`? This is equivalent to having `current_timestamp()` as a column right? Will it also be a constant value across the trigger, or will it be static for the duration of the trigger?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145202164
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is less than " +
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method returns -1 when calling inside a batch query.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    +
    --- End diff --
    
    intentional, easier to read


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82869 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82869/testReport)** for PR 19495 at commit [`ed9d3a2`](https://github.com/apache/spark/commit/ed9d3a2e4fd4b1a0ec0e0ceb34419ede4e2303b8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82869/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145283802
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala ---
    @@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec(
     
       private val isTimeoutEnabled = timeoutConf != NoTimeout
       val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled)
    +  val watermarkPresent = child.output.exists {
    --- End diff --
    
    yes. agreed.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82848 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82848/testReport)** for PR 19495 at commit [`9aed5eb`](https://github.com/apache/spark/commit/9aed5eb41b8a1792e1b1870590c7e739eb227f1f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82848/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144989072
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         }
       }
     
    +  test("GroupState - getCurrentWatermarkMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get event time watermark timestamp without enabling event time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() }
    +    assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000)
    +    assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
    +    assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
    +  }
    +
    +  test("GroupState - getCurrentProcessingTimeMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get processing time timestamp without enabling processing time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000)
    +    assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() }
    --- End diff --
    
    the comment above says otherwise. that's why I was confused


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144991277
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -270,6 +270,60 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         }
       }
     
    +  test("GroupState - getCurrentWatermarkMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get event time watermark timestamp without enabling event time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, watermark: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, 1000, watermark, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { streamingState(ProcessingTimeTimeout, 1000).getCurrentWatermarkMs() }
    +    assert(streamingState(EventTimeTimeout, 1000).getCurrentWatermarkMs() === 1000)
    +    assert(streamingState(EventTimeTimeout, 2000).getCurrentWatermarkMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentWatermarkMs() }
    +    assertWrongTimeoutError { batchState(ProcessingTimeTimeout).getCurrentWatermarkMs() }
    +    assert(batchState(EventTimeTimeout).getCurrentWatermarkMs() === -1)
    +  }
    +
    +  test("GroupState - getCurrentProcessingTimeMs") {
    +    def assertWrongTimeoutError(test: => Unit): Unit = {
    +      val e = intercept[UnsupportedOperationException] { test }
    +      assert(e.getMessage.contains(
    +        "Cannot get processing time timestamp without enabling processing time timeout"))
    +    }
    +
    +    def streamingState(timeoutConf: GroupStateTimeout, procTime: Long): GroupState[Int] = {
    +      GroupStateImpl.createForStreaming(None, procTime, -1, timeoutConf, hasTimedOut = false)
    +    }
    +
    +    def batchState(timeoutConf: GroupStateTimeout): GroupState[Any] = {
    +      GroupStateImpl.createForBatch(timeoutConf)
    +    }
    +
    +    // Tests for getCurrentWatermarkMs in streaming queries
    +    assertWrongTimeoutError { streamingState(NoTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assertWrongTimeoutError { streamingState(EventTimeTimeout, 1000).getCurrentProcessingTimeMs() }
    +    assert(streamingState(ProcessingTimeTimeout, 1000).getCurrentProcessingTimeMs() === 1000)
    +    assert(streamingState(ProcessingTimeTimeout, 2000).getCurrentProcessingTimeMs() === 2000)
    +
    +    // Tests for getCurrentWatermarkMs in batch queries
    +    assertWrongTimeoutError { batchState(NoTimeout).getCurrentProcessingTimeMs() }
    --- End diff --
    
    aaaargh. my bad. i didnt realize your comment was about the comment. my bad.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144972248
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is less than " +
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method returns -1 when calling inside a batch query.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    --- End diff --
    
    Introducing a new API for that purpose is a bigger question. In general, if we want to expose a general piece of information that may be used in any operation, then we introduce it as a sql function like "current_timestamp". That is orthogonal to requirement here, we can still add that if we find the need for them


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144992239
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala ---
    @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
         timeoutTimestamp = timestampMs
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
       }
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime)
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
       }
     
    +  override def getCurrentWatermarkMs(): Long = {
    +    if (timeoutConf != EventTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get event time watermark timestamp without enabling event time timeout in " +
    +          "[map|flatMap]GroupsWithState")
    +    }
    +    eventTimeWatermarkMs
    +  }
    +
    +  override def getCurrentProcessingTimeMs(): Long = {
    +    if (timeoutConf != ProcessingTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get processing time timestamp without enabling processing time timeout in " +
    +          "map|flatMap]GroupsWithState")
    --- End diff --
    
    nit: `[`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144736325
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala ---
    @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
         timeoutTimestamp = timestampMs
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
       }
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime)
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
       }
     
    +  override def getCurrentWatermarkMs(): Long = {
    +    if (timeoutConf != EventTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get event time watermark timestamp without enabling event time timeout in " +
    +          "[map/flatMap]GroupsWithState")
    +    }
    +    eventTimeWatermarkMs
    +  }
    +
    +  override def getCurrentProcessingTimeMs(): Long = {
    +    if (timeoutConf != ProcessingTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get processing time timestamp without enabling processing time timeout in " +
    +          "map/flatMap]GroupsWithState")
    --- End diff --
    
    `map/flatMap]` -> `[map/flatMap]`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145283034
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -1086,4 +1181,24 @@ object FlatMapGroupsWithStateSuite {
         override def metrics: StateStoreMetrics = new StateStoreMetrics(map.size, 0, Map.empty)
         override def hasCommitted: Boolean = true
       }
    +
    +  def assertCanGetProcessingTime(predicate: => Boolean): Unit = {
    +    if (!predicate) throw new TestFailedException("Could not get processing time", 20)
    --- End diff --
    
    what is the `20`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    LGTM!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82823/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    LGTM. Just a bunch of cosmetic nits, but fine to address them separately


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144935264
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is less than " +
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method returns -1 when calling inside a batch query.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    --- End diff --
    
    I'm wondering if we should expose these individually or expose them one layer deeper like Flink and Kafka Streams, where these are wrapped in a `ProcessingContext` object


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82823 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82823/testReport)** for PR 19495 at commit [`2f35889`](https://github.com/apache/spark/commit/2f358893a0dfff08c2175034219623121b55472e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82859/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82859 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82859/testReport)** for PR 19495 at commit [`037e036`](https://github.com/apache/spark/commit/037e03658b2102cb1025019b2a22535a5ecb3f50).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82751 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82751/testReport)** for PR 19495 at commit [`67114ab`](https://github.com/apache/spark/commit/67114ab59f5a8d79fbe66b7deb93869f656346b9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145282388
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala ---
    @@ -61,6 +61,10 @@ case class FlatMapGroupsWithStateExec(
     
       private val isTimeoutEnabled = timeoutConf != NoTimeout
       val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, isTimeoutEnabled)
    +  val watermarkPresent = child.output.exists {
    --- End diff --
    
    this is cleaner, but doesn't `eventTimeWatermark.isDefined` imply that the watermark is present?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144936080
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala ---
    @@ -119,32 +115,39 @@ private[sql] class GroupStateImpl[S] private(
         timeoutTimestamp = timestampMs
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
       }
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime)
       }
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
       override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
         checkTimeoutTimestampAllowed()
         setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
       }
     
    +  override def getCurrentWatermarkMs(): Long = {
    +    if (timeoutConf != EventTimeTimeout) {
    +      throw new UnsupportedOperationException(
    +        "Cannot get event time watermark timestamp without enabling event time timeout in " +
    +          "[map/flatMap]GroupsWithState")
    --- End diff --
    
    uber nit: should we be consistent with `/` and `|` half the places we use `[map/flatMap]` and in the other half there is `[map|flatMap]`. I prefer `|`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r144992379
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
       /**
        * Set the timeout duration in ms for this key.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'durationMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(durationMs: Long): Unit
     
    +
       /**
        * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
        *
    -   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Processing time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
       @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
       @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutDuration(duration: String): Unit
     
    -  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'timestampMs' is not positive or less than the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if processing time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException](
    +    "if 'additionalDuration' is invalid or the final timeout timestamp is less than " +
    +      "the current watermark in a streaming query")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
     
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date.
        * This timestamp cannot be older than the current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
     
    -  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    -  @throws[IllegalStateException]("when state is either not initialized, or already removed")
    -  @throws[UnsupportedOperationException](
    -    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
    +
       /**
        * Set the timeout timestamp for this key as a java.sql.Date and an additional
        * duration as a string (e.g. "1 hour", "2 days", etc.).
        * The final timestamp (including the additional duration) cannot be older than the
        * current watermark.
        *
    -   * @note EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *      `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method has no side effect when used in a batch query.
        */
    +  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
       def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
    +
    +
    +  /**
    +   * Get the current event time watermark as milliseconds in epoch time.
    +   *
    +   * @note [[GroupStateTimeout Event time timeout]] must be enabled in
    +   *       `[map/flatmap]GroupsWithState` for calling this method.
    +   * @note This method returns -1 when calling inside a batch query.
    +   */
    +  @throws[UnsupportedOperationException](
    +    "if event time timeout has not been enabled in [map|flatMap]GroupsWithState")
    +  def getCurrentWatermarkMs(): Long
    +
    --- End diff --
    
    nit: extra line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19495: [SPARK-22278][SS] Expose current event time water...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19495#discussion_r145202185
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala ---
    @@ -205,92 +205,127 @@ trait GroupState[S] extends LogicalGroupState[S] {
       /** Get the state value as a scala Option. */
       def getOption: Option[S]
     
    -  /**
    -   * Update the value of the state. Note that `null` is not a valid value, and it throws
    -   * IllegalArgumentException.
    -   */
    -  @throws[IllegalArgumentException]("when updating with null")
    +  /** Update the value of the state. */
       def update(newState: S): Unit
     
       /** Remove this state. */
       def remove(): Unit
     
       /**
        * Whether the function has been called because the key has timed out.
    -   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
    +   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithState`.
        */
       def hasTimedOut: Boolean
     
    +
    --- End diff --
    
    thats intentional. visually makes it easier to read. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82848 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82848/testReport)** for PR 19495 at commit [`9aed5eb`](https://github.com/apache/spark/commit/9aed5eb41b8a1792e1b1870590c7e739eb227f1f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82864 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82864/testReport)** for PR 19495 at commit [`0d788fe`](https://github.com/apache/spark/commit/0d788fe1d9f5cf430ac548bbda3947b3d879505b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    **[Test build #82751 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82751/testReport)** for PR 19495 at commit [`67114ab`](https://github.com/apache/spark/commit/67114ab59f5a8d79fbe66b7deb93869f656346b9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19495: [SPARK-22278][SS] Expose current event time watermark an...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19495
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82820/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org