You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2017/03/20 11:05:05 UTC

[GitHub] spark pull request #17361: [SPARK-20030][SS][WIP]Event-time-based timeout fo...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/17361

    [SPARK-20030][SS][WIP]Event-time-based timeout for MapGroupsWithState

    ## What changes were proposed in this pull request?
    
    Adding event time based timeout. The user sets the timeout timestamp directly using `KeyedState.setTimeoutTimestamp`. The keys times out when the watermark crosses the timeout timestamp.
    
    ## How was this patch tested?
    Unit tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-20030

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17361.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17361
    
----
commit 3f77c01f255e548fb4b80a82b180f6154db53b97
Author: Tathagata Das <ta...@gmail.com>
Date:   2017-03-20T11:05:17Z

    First draft of event time timeout

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107309220
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    +
    +          // Check compatibility with output modes and aggregations in query
    +          val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
    +
    +          if (m.isMapGroupsWithState) {                       // check mapGroupsWithState
    +            // allowed only in update query output mode and without aggregation
    +            if (aggsAfterFlatMapGroups.nonEmpty) {
    +              throwError(
    +                "mapGroupsWithState is not supported with aggregation " +
    +                  "on a streaming DataFrame/Dataset")
    +            } else if (outputMode != InternalOutputModes.Update) {
    +              throwError(
    +                "mapGroupsWithState is not supported with " +
                       s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            }
    +          } else {                                           // check latMapGroupsWithState
    +            if (aggsAfterFlatMapGroups.isEmpty) {
    +              // flatMapGroupsWithState without aggregation: operation's output mode must
    +              // match query output mode
    +              m.outputMode match {
    +                case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
    +                  throwError(
    +                    "flatMapGroupsWithState in update mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
    +                  throwError(
    +                    "flatMapGroupsWithState in append mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case _ =>
                   }
    -            case InternalOutputModes.Append =>
    -              if (outputMode != InternalOutputModes.Append) {
    -                throwError("flatMapGroupsWithState in append mode is not supported with " +
    -                  s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            } else {
    +              // flatMapGroupsWithState with aggregation: update operation mode not allowed, and
    +              // *groupsWithState after aggregation not allowed
    +              if (m.outputMode == InternalOutputModes.Update) {
    +                throwError(
    +                  "flatMapGroupsWithState in update mode is not supported with " +
    +                    "aggregation on a streaming DataFrame/Dataset")
    +              } else if (collectStreamingAggregates(m).nonEmpty) {
    +                throwError(
    +                  "flatMapGroupsWithState in append mode is not supported after " +
    +                    s"aggregation on a streaming DataFrame/Dataset")
                   }
    +            }
               }
     
    -        // flatMapGroupsWithState(Update) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Update
    -            && collectStreamingAggregates(plan).nonEmpty =>
    -          throwError("flatMapGroupsWithState in update mode is not supported with " +
    -            "aggregation on a streaming DataFrame/Dataset")
    -
    -        // flatMapGroupsWithState(Append) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Append
    -            && collectStreamingAggregates(m).nonEmpty =>
    -          throwError("flatMapGroupsWithState in append mode is not supported after " +
    -            s"aggregation on a streaming DataFrame/Dataset")
    +          // Check compatibility with timeout configs
    +          if (m.timeout == EventTimeTimeout) {
    +            // With event time timeout, watermark must be defined.
    +            val watermarkAttributes = m.child.output.collect {
    +              case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
    +            }
    +            if (watermarkAttributes.isEmpty) {
    +              throwError(
    +                "Event time timeout is not supported in a [map|flatMap]GroupsWithState " +
    --- End diff --
    
    Aah, I get it. The apache docs does have hyphenation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307253
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    +
    +          // Check compatibility with output modes and aggregations in query
    +          val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
    +
    +          if (m.isMapGroupsWithState) {                       // check mapGroupsWithState
    +            // allowed only in update query output mode and without aggregation
    +            if (aggsAfterFlatMapGroups.nonEmpty) {
    +              throwError(
    +                "mapGroupsWithState is not supported with aggregation " +
    +                  "on a streaming DataFrame/Dataset")
    +            } else if (outputMode != InternalOutputModes.Update) {
    +              throwError(
    +                "mapGroupsWithState is not supported with " +
                       s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            }
    +          } else {                                           // check latMapGroupsWithState
    +            if (aggsAfterFlatMapGroups.isEmpty) {
    +              // flatMapGroupsWithState without aggregation: operation's output mode must
    +              // match query output mode
    +              m.outputMode match {
    +                case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
    +                  throwError(
    +                    "flatMapGroupsWithState in update mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
    +                  throwError(
    +                    "flatMapGroupsWithState in append mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case _ =>
                   }
    -            case InternalOutputModes.Append =>
    -              if (outputMode != InternalOutputModes.Append) {
    -                throwError("flatMapGroupsWithState in append mode is not supported with " +
    -                  s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            } else {
    +              // flatMapGroupsWithState with aggregation: update operation mode not allowed, and
    +              // *groupsWithState after aggregation not allowed
    +              if (m.outputMode == InternalOutputModes.Update) {
    +                throwError(
    +                  "flatMapGroupsWithState in update mode is not supported with " +
    +                    "aggregation on a streaming DataFrame/Dataset")
    +              } else if (collectStreamingAggregates(m).nonEmpty) {
    +                throwError(
    +                  "flatMapGroupsWithState in append mode is not supported after " +
    +                    s"aggregation on a streaming DataFrame/Dataset")
                   }
    +            }
               }
     
    -        // flatMapGroupsWithState(Update) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Update
    -            && collectStreamingAggregates(plan).nonEmpty =>
    -          throwError("flatMapGroupsWithState in update mode is not supported with " +
    -            "aggregation on a streaming DataFrame/Dataset")
    -
    -        // flatMapGroupsWithState(Append) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Append
    -            && collectStreamingAggregates(m).nonEmpty =>
    -          throwError("flatMapGroupsWithState in append mode is not supported after " +
    -            s"aggregation on a streaming DataFrame/Dataset")
    +          // Check compatibility with timeout configs
    +          if (m.timeout == EventTimeTimeout) {
    +            // With event time timeout, watermark must be defined.
    +            val watermarkAttributes = m.child.output.collect {
    +              case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
    +            }
    +            if (watermarkAttributes.isEmpty) {
    +              throwError(
    +                "Event time timeout is not supported in a [map|flatMap]GroupsWithState " +
    --- End diff --
    
    are we? I didnt know there was a policy. I am fine hyphenating.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75017 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75017/testReport)** for PR 17361 at commit [`9c9668b`](https://github.com/apache/spark/commit/9c9668b9e2d76e3ef56f6a8094c76b5a38178d1b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75001 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75001/testReport)** for PR 17361 at commit [`6759165`](https://github.com/apache/spark/commit/6759165f9b6d26c87b94e7acc40914ae4ca37a89).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307589
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java ---
    @@ -34,9 +32,20 @@
     @InterfaceStability.Evolving
     public class KeyedStateTimeout {
     
    -  /** Timeout based on processing time.  */
    +  /**
    +   * Timeout based on processing time. The duration of timeout can be set for each group in
    +   * `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutDuration()`.
    +   */
       public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
    --- End diff --
    
    Its just that if someone this does `import KeyedStateTimeout._` the code boils down to 
    `flatMapGroupsWithState(Update, ProcessingTime) { ... } ` with no reference to timeout. 
    
    Fine either way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75010/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74919/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307617
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -519,6 +588,52 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         )
       }
     
    +  test("flatMapGroupsWithState - streaming with event time timeout") {
    +    // Function to maintain the max event time
    +    // Returns the max event time in the state, or -1 if the state was removed by timeout
    +    val stateFunc = (
    +        key: String,
    +        values: Iterator[(String, Long)],
    +        state: KeyedState[Long]) => {
    +      val timeoutDelay = 5
    +      if (key != "a") {
    +        Iterator.empty
    +      } else {
    +        if (state.hasTimedOut) {
    +          state.remove()
    +          Iterator((key, -1))
    +        } else {
    +          val valuesSeq = values.toSeq
    +          val maxEventTime = math.max(valuesSeq.map(_._2).max, state.getOption.getOrElse(0L))
    +          val timeoutTimestampMs = maxEventTime + timeoutDelay
    +          state.update(maxEventTime)
    +          state.setTimeoutTimestamp(timeoutTimestampMs * 1000)
    +          Iterator((key, maxEventTime.toInt))
    +        }
    +      }
    +    }
    +    val inputData = MemoryStream[(String, Int)]
    +    val result =
    +      inputData.toDS
    +        .select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
    +        .withWatermark("eventTime", "10 seconds")
    +        .as[(String, Long)]
    +        .groupByKey[String]((x: (String, Long)) => x._1)
    +        .flatMapGroupsWithState[Long, (String, Int)](Update, EventTimeTimeout)(stateFunc)
    --- End diff --
    
    As long as they aren't required its okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107304133
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java ---
    @@ -34,9 +32,20 @@
     @InterfaceStability.Evolving
     public class KeyedStateTimeout {
     
    -  /** Timeout based on processing time.  */
    +  /**
    +   * Timeout based on processing time. The duration of timeout can be set for each group in
    +   * `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutDuration()`.
    +   */
       public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
    --- End diff --
    
    Nit: I'd consider removing the `Timeout` here and as its kind of redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74969 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74969/testReport)** for PR 17361 at commit [`2c5592c`](https://github.com/apache/spark/commit/2c5592cd12843bd532ea79f8705240d73623ae05).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74881 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74881/testReport)** for PR 17361 at commit [`3f77c01`](https://github.com/apache/spark/commit/3f77c01f255e548fb4b80a82b180f6154db53b97).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75010 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75010/testReport)** for PR 17361 at commit [`64b6abf`](https://github.com/apache/spark/commit/64b6abf89189fe8a3f3d2f58f341a9b58a95a6d2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107304618
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    +
    +          // Check compatibility with output modes and aggregations in query
    +          val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
    +
    +          if (m.isMapGroupsWithState) {                       // check mapGroupsWithState
    +            // allowed only in update query output mode and without aggregation
    +            if (aggsAfterFlatMapGroups.nonEmpty) {
    +              throwError(
    +                "mapGroupsWithState is not supported with aggregation " +
    +                  "on a streaming DataFrame/Dataset")
    +            } else if (outputMode != InternalOutputModes.Update) {
    +              throwError(
    +                "mapGroupsWithState is not supported with " +
                       s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            }
    +          } else {                                           // check latMapGroupsWithState
    +            if (aggsAfterFlatMapGroups.isEmpty) {
    +              // flatMapGroupsWithState without aggregation: operation's output mode must
    +              // match query output mode
    +              m.outputMode match {
    +                case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
    +                  throwError(
    +                    "flatMapGroupsWithState in update mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
    +                  throwError(
    +                    "flatMapGroupsWithState in append mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case _ =>
                   }
    -            case InternalOutputModes.Append =>
    -              if (outputMode != InternalOutputModes.Append) {
    -                throwError("flatMapGroupsWithState in append mode is not supported with " +
    -                  s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            } else {
    +              // flatMapGroupsWithState with aggregation: update operation mode not allowed, and
    +              // *groupsWithState after aggregation not allowed
    +              if (m.outputMode == InternalOutputModes.Update) {
    +                throwError(
    +                  "flatMapGroupsWithState in update mode is not supported with " +
    +                    "aggregation on a streaming DataFrame/Dataset")
    +              } else if (collectStreamingAggregates(m).nonEmpty) {
    +                throwError(
    +                  "flatMapGroupsWithState in append mode is not supported after " +
    +                    s"aggregation on a streaming DataFrame/Dataset")
                   }
    +            }
               }
     
    -        // flatMapGroupsWithState(Update) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Update
    -            && collectStreamingAggregates(plan).nonEmpty =>
    -          throwError("flatMapGroupsWithState in update mode is not supported with " +
    -            "aggregation on a streaming DataFrame/Dataset")
    -
    -        // flatMapGroupsWithState(Append) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Append
    -            && collectStreamingAggregates(m).nonEmpty =>
    -          throwError("flatMapGroupsWithState in append mode is not supported after " +
    -            s"aggregation on a streaming DataFrame/Dataset")
    +          // Check compatibility with timeout configs
    +          if (m.timeout == EventTimeTimeout) {
    +            // With event time timeout, watermark must be defined.
    +            val watermarkAttributes = m.child.output.collect {
    +              case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
    +            }
    +            if (watermarkAttributes.isEmpty) {
    +              throwError(
    +                "Event time timeout is not supported in a [map|flatMap]GroupsWithState " +
    +                  "without watermark. Use '[Dataset/DataFrame].withWatermark()' to " +
    --- End diff --
    
    I would consider making this an affirmative statement.  "You must define a watermark on a dataframe inorder to use event-time based timeouts".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74971 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74971/testReport)** for PR 17361 at commit [`0523aaf`](https://github.com/apache/spark/commit/0523aaf4ba653d5515f563172869b8999494ca01).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107304893
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -519,6 +588,52 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         )
       }
     
    +  test("flatMapGroupsWithState - streaming with event time timeout") {
    +    // Function to maintain the max event time
    +    // Returns the max event time in the state, or -1 if the state was removed by timeout
    +    val stateFunc = (
    +        key: String,
    +        values: Iterator[(String, Long)],
    +        state: KeyedState[Long]) => {
    +      val timeoutDelay = 5
    +      if (key != "a") {
    +        Iterator.empty
    +      } else {
    +        if (state.hasTimedOut) {
    +          state.remove()
    +          Iterator((key, -1))
    +        } else {
    +          val valuesSeq = values.toSeq
    +          val maxEventTime = math.max(valuesSeq.map(_._2).max, state.getOption.getOrElse(0L))
    +          val timeoutTimestampMs = maxEventTime + timeoutDelay
    +          state.update(maxEventTime)
    +          state.setTimeoutTimestamp(timeoutTimestampMs * 1000)
    +          Iterator((key, maxEventTime.toInt))
    +        }
    +      }
    +    }
    +    val inputData = MemoryStream[(String, Int)]
    +    val result =
    +      inputData.toDS
    +        .select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
    +        .withWatermark("eventTime", "10 seconds")
    +        .as[(String, Long)]
    +        .groupByKey[String]((x: (String, Long)) => x._1)
    +        .flatMapGroupsWithState[Long, (String, Int)](Update, EventTimeTimeout)(stateFunc)
    --- End diff --
    
    These types are just here for testing? (i.e. we didn't break inference right?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75017 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75017/testReport)** for PR 17361 at commit [`9c9668b`](https://github.com/apache/spark/commit/9c9668b9e2d76e3ef56f6a8094c76b5a38178d1b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74971 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74971/testReport)** for PR 17361 at commit [`0523aaf`](https://github.com/apache/spark/commit/0523aaf4ba653d5515f563172869b8999494ca01).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307806
  
    --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java ---
    @@ -34,9 +32,20 @@
     @InterfaceStability.Evolving
     public class KeyedStateTimeout {
     
    -  /** Timeout based on processing time.  */
    +  /**
    +   * Timeout based on processing time. The duration of timeout can be set for each group in
    +   * `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutDuration()`.
    +   */
       public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
    --- End diff --
    
    I'd probably still remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75017/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307367
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala ---
    @@ -17,37 +17,45 @@
     
     package org.apache.spark.sql.execution.streaming
     
    +import java.sql.Date
    +
     import org.apache.commons.lang3.StringUtils
     
    -import org.apache.spark.sql.streaming.KeyedState
    +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
    +import org.apache.spark.sql.execution.streaming.KeyedStateImpl._
    +import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout}
     import org.apache.spark.unsafe.types.CalendarInterval
     
    +
     /**
      * Internal implementation of the [[KeyedState]] interface. Methods are not thread-safe.
      * @param optionalValue Optional value of the state
      * @param batchProcessingTimeMs Processing time of current batch, used to calculate timestamp
      *                              for processing time timeouts
    - * @param isTimeoutEnabled Whether timeout is enabled. This will be used to check whether the user
    - *                         is allowed to configure timeouts.
    + * @param timeoutConf   Type of timeout configured. Based on this, different operations will
    + *                        be supported.
    --- End diff --
    
    nit: indent is inconsistent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74971/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107304196
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    --- End diff --
    
    Wow, this is getting complicated...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75001 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75001/testReport)** for PR 17361 at commit [`6759165`](https://github.com/apache/spark/commit/6759165f9b6d26c87b94e7acc40914ae4ca37a89).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74881 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74881/testReport)** for PR 17361 at commit [`3f77c01`](https://github.com/apache/spark/commit/3f77c01f255e548fb4b80a82b180f6154db53b97).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    @tdas  Just FYI, I'm getting lint-java error:
    
    yuhao@yuhao-devbox:~/workspace/github/hhbyyh/spark$ ./dev/lint-java 
    ~Using `mvn` from path: /usr/bin/mvn
    Checkstyle checks failed at following occurrences:
    [ERROR] src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java:[41,35] (naming) MethodName: Method name 'ProcessingTimeTimeout' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
    [ERROR] src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java:[51,35] (naming) MethodName: Method name 'EventTimeTimeout' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
    [ERROR] src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java:[54,35] (naming) MethodName: Method name 'NoTimeout' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
    
    Maybe we should suppress the style error?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74969/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74919 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74919/testReport)** for PR 17361 at commit [`f6d2143`](https://github.com/apache/spark/commit/f6d2143449ca012b646d78360ceb0cbf010ffc61).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307445
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---
    @@ -519,6 +588,52 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
         )
       }
     
    +  test("flatMapGroupsWithState - streaming with event time timeout") {
    +    // Function to maintain the max event time
    +    // Returns the max event time in the state, or -1 if the state was removed by timeout
    +    val stateFunc = (
    +        key: String,
    +        values: Iterator[(String, Long)],
    +        state: KeyedState[Long]) => {
    +      val timeoutDelay = 5
    +      if (key != "a") {
    +        Iterator.empty
    +      } else {
    +        if (state.hasTimedOut) {
    +          state.remove()
    +          Iterator((key, -1))
    +        } else {
    +          val valuesSeq = values.toSeq
    +          val maxEventTime = math.max(valuesSeq.map(_._2).max, state.getOption.getOrElse(0L))
    +          val timeoutTimestampMs = maxEventTime + timeoutDelay
    +          state.update(maxEventTime)
    +          state.setTimeoutTimestamp(timeoutTimestampMs * 1000)
    +          Iterator((key, maxEventTime.toInt))
    +        }
    +      }
    +    }
    +    val inputData = MemoryStream[(String, Int)]
    +    val result =
    +      inputData.toDS
    +        .select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
    +        .withWatermark("eventTime", "10 seconds")
    +        .as[(String, Long)]
    +        .groupByKey[String]((x: (String, Long)) => x._1)
    +        .flatMapGroupsWithState[Long, (String, Int)](Update, EventTimeTimeout)(stateFunc)
    --- End diff --
    
    I was debugging and I left them there thinking it help readability of tests. I can remove them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS][WIP]Event-time-based timeout fo...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107107023
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    --- End diff --
    
    This refactoring passes all existing tests in UnsupportedOperationsSuite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307722
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    +
    +          // Check compatibility with output modes and aggregations in query
    +          val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
    +
    +          if (m.isMapGroupsWithState) {                       // check mapGroupsWithState
    +            // allowed only in update query output mode and without aggregation
    +            if (aggsAfterFlatMapGroups.nonEmpty) {
    +              throwError(
    +                "mapGroupsWithState is not supported with aggregation " +
    +                  "on a streaming DataFrame/Dataset")
    +            } else if (outputMode != InternalOutputModes.Update) {
    +              throwError(
    +                "mapGroupsWithState is not supported with " +
                       s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            }
    +          } else {                                           // check latMapGroupsWithState
    +            if (aggsAfterFlatMapGroups.isEmpty) {
    +              // flatMapGroupsWithState without aggregation: operation's output mode must
    +              // match query output mode
    +              m.outputMode match {
    +                case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
    +                  throwError(
    +                    "flatMapGroupsWithState in update mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
    +                  throwError(
    +                    "flatMapGroupsWithState in append mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case _ =>
                   }
    -            case InternalOutputModes.Append =>
    -              if (outputMode != InternalOutputModes.Append) {
    -                throwError("flatMapGroupsWithState in append mode is not supported with " +
    -                  s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            } else {
    +              // flatMapGroupsWithState with aggregation: update operation mode not allowed, and
    +              // *groupsWithState after aggregation not allowed
    +              if (m.outputMode == InternalOutputModes.Update) {
    +                throwError(
    +                  "flatMapGroupsWithState in update mode is not supported with " +
    +                    "aggregation on a streaming DataFrame/Dataset")
    +              } else if (collectStreamingAggregates(m).nonEmpty) {
    +                throwError(
    +                  "flatMapGroupsWithState in append mode is not supported after " +
    +                    s"aggregation on a streaming DataFrame/Dataset")
                   }
    +            }
               }
     
    -        // flatMapGroupsWithState(Update) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Update
    -            && collectStreamingAggregates(plan).nonEmpty =>
    -          throwError("flatMapGroupsWithState in update mode is not supported with " +
    -            "aggregation on a streaming DataFrame/Dataset")
    -
    -        // flatMapGroupsWithState(Append) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Append
    -            && collectStreamingAggregates(m).nonEmpty =>
    -          throwError("flatMapGroupsWithState in append mode is not supported after " +
    -            s"aggregation on a streaming DataFrame/Dataset")
    +          // Check compatibility with timeout configs
    +          if (m.timeout == EventTimeTimeout) {
    +            // With event time timeout, watermark must be defined.
    +            val watermarkAttributes = m.child.output.collect {
    +              case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
    +            }
    +            if (watermarkAttributes.isEmpty) {
    +              throwError(
    +                "Event time timeout is not supported in a [map|flatMap]GroupsWithState " +
    --- End diff --
    
    i want it to be consistent and the docs hyphenate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74894/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74910 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74910/testReport)** for PR 17361 at commit [`ac17886`](https://github.com/apache/spark/commit/ac17886fc8a24498af1fcc7eb70527af370ce498).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74919 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74919/testReport)** for PR 17361 at commit [`f6d2143`](https://github.com/apache/spark/commit/f6d2143449ca012b646d78360ceb0cbf010ffc61).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74969 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74969/testReport)** for PR 17361 at commit [`2c5592c`](https://github.com/apache/spark/commit/2c5592cd12843bd532ea79f8705240d73623ae05).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75003/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #3604 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3604/testReport)** for PR 17361 at commit [`9c9668b`](https://github.com/apache/spark/commit/9c9668b9e2d76e3ef56f6a8094c76b5a38178d1b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74910/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74894 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74894/testReport)** for PR 17361 at commit [`6e9f408`](https://github.com/apache/spark/commit/6e9f408d73090429d7497840d6daa7a4e19439e6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75001/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107307283
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    +
    +          // Check compatibility with output modes and aggregations in query
    +          val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
    +
    +          if (m.isMapGroupsWithState) {                       // check mapGroupsWithState
    +            // allowed only in update query output mode and without aggregation
    +            if (aggsAfterFlatMapGroups.nonEmpty) {
    +              throwError(
    +                "mapGroupsWithState is not supported with aggregation " +
    +                  "on a streaming DataFrame/Dataset")
    +            } else if (outputMode != InternalOutputModes.Update) {
    +              throwError(
    +                "mapGroupsWithState is not supported with " +
                       s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            }
    +          } else {                                           // check latMapGroupsWithState
    +            if (aggsAfterFlatMapGroups.isEmpty) {
    +              // flatMapGroupsWithState without aggregation: operation's output mode must
    +              // match query output mode
    +              m.outputMode match {
    +                case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
    +                  throwError(
    +                    "flatMapGroupsWithState in update mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
    +                  throwError(
    +                    "flatMapGroupsWithState in append mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case _ =>
                   }
    -            case InternalOutputModes.Append =>
    -              if (outputMode != InternalOutputModes.Append) {
    -                throwError("flatMapGroupsWithState in append mode is not supported with " +
    -                  s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            } else {
    +              // flatMapGroupsWithState with aggregation: update operation mode not allowed, and
    +              // *groupsWithState after aggregation not allowed
    +              if (m.outputMode == InternalOutputModes.Update) {
    +                throwError(
    +                  "flatMapGroupsWithState in update mode is not supported with " +
    +                    "aggregation on a streaming DataFrame/Dataset")
    +              } else if (collectStreamingAggregates(m).nonEmpty) {
    +                throwError(
    +                  "flatMapGroupsWithState in append mode is not supported after " +
    +                    s"aggregation on a streaming DataFrame/Dataset")
                   }
    +            }
               }
     
    -        // flatMapGroupsWithState(Update) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Update
    -            && collectStreamingAggregates(plan).nonEmpty =>
    -          throwError("flatMapGroupsWithState in update mode is not supported with " +
    -            "aggregation on a streaming DataFrame/Dataset")
    -
    -        // flatMapGroupsWithState(Append) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Append
    -            && collectStreamingAggregates(m).nonEmpty =>
    -          throwError("flatMapGroupsWithState in append mode is not supported after " +
    -            s"aggregation on a streaming DataFrame/Dataset")
    +          // Check compatibility with timeout configs
    +          if (m.timeout == EventTimeTimeout) {
    +            // With event time timeout, watermark must be defined.
    +            val watermarkAttributes = m.child.output.collect {
    +              case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
    +            }
    +            if (watermarkAttributes.isEmpty) {
    +              throwError(
    +                "Event time timeout is not supported in a [map|flatMap]GroupsWithState " +
    +                  "without watermark. Use '[Dataset/DataFrame].withWatermark()' to " +
    --- End diff --
    
    Okay. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74894 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74894/testReport)** for PR 17361 at commit [`6e9f408`](https://github.com/apache/spark/commit/6e9f408d73090429d7497840d6daa7a4e19439e6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75003 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75003/testReport)** for PR 17361 at commit [`d0758eb`](https://github.com/apache/spark/commit/d0758ebd6b78c6cde97e9750275a0fbba93da764).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #3604 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3604/testReport)** for PR 17361 at commit [`9c9668b`](https://github.com/apache/spark/commit/9c9668b9e2d76e3ef56f6a8094c76b5a38178d1b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75003 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75003/testReport)** for PR 17361 at commit [`d0758eb`](https://github.com/apache/spark/commit/d0758ebd6b78c6cde97e9750275a0fbba93da764).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74881/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS][WIP]Event-time-based timeout fo...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107106415
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    --- End diff --
    
    Refactored this to contains all tests related to map/flatMapGroupsWithState under a single case statement. this way its easier to reason whether all the possible combinations of operator+output-mode+aggregation has been covered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS][WIP]Event-time-based timeout for MapGr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #74910 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74910/testReport)** for PR 17361 at commit [`ac17886`](https://github.com/apache/spark/commit/ac17886fc8a24498af1fcc7eb70527af370ce498).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17361: [SPARK-20030][SS] Event-time-based timeout for Ma...

Posted by marmbrus <gi...@git.apache.org>.
Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17361#discussion_r107304531
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---
    @@ -147,49 +147,68 @@ object UnsupportedOperationChecker {
               throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
                 "streaming DataFrames/Datasets")
     
    -        // mapGroupsWithState: Allowed only when no aggregation + Update output mode
    -        case m: FlatMapGroupsWithState if m.isStreaming && m.isMapGroupsWithState =>
    -          if (collectStreamingAggregates(plan).isEmpty) {
    -            if (outputMode != InternalOutputModes.Update) {
    -              throwError("mapGroupsWithState is not supported with " +
    -                s"$outputMode output mode on a streaming DataFrame/Dataset")
    -            } else {
    -              // Allowed when no aggregation + Update output mode
    -            }
    -          } else {
    -            throwError("mapGroupsWithState is not supported with aggregation " +
    -              "on a streaming DataFrame/Dataset")
    -          }
    -
    -        // flatMapGroupsWithState without aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && collectStreamingAggregates(plan).isEmpty =>
    -          m.outputMode match {
    -            case InternalOutputModes.Update =>
    -              if (outputMode != InternalOutputModes.Update) {
    -                throwError("flatMapGroupsWithState in update mode is not supported with " +
    +        // mapGroupsWithState and flatMapGroupsWithState
    +        case m: FlatMapGroupsWithState if m.isStreaming =>
    +
    +          // Check compatibility with output modes and aggregations in query
    +          val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
    +
    +          if (m.isMapGroupsWithState) {                       // check mapGroupsWithState
    +            // allowed only in update query output mode and without aggregation
    +            if (aggsAfterFlatMapGroups.nonEmpty) {
    +              throwError(
    +                "mapGroupsWithState is not supported with aggregation " +
    +                  "on a streaming DataFrame/Dataset")
    +            } else if (outputMode != InternalOutputModes.Update) {
    +              throwError(
    +                "mapGroupsWithState is not supported with " +
                       s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            }
    +          } else {                                           // check latMapGroupsWithState
    +            if (aggsAfterFlatMapGroups.isEmpty) {
    +              // flatMapGroupsWithState without aggregation: operation's output mode must
    +              // match query output mode
    +              m.outputMode match {
    +                case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
    +                  throwError(
    +                    "flatMapGroupsWithState in update mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
    +                  throwError(
    +                    "flatMapGroupsWithState in append mode is not supported with " +
    +                      s"$outputMode output mode on a streaming DataFrame/Dataset")
    +
    +                case _ =>
                   }
    -            case InternalOutputModes.Append =>
    -              if (outputMode != InternalOutputModes.Append) {
    -                throwError("flatMapGroupsWithState in append mode is not supported with " +
    -                  s"$outputMode output mode on a streaming DataFrame/Dataset")
    +            } else {
    +              // flatMapGroupsWithState with aggregation: update operation mode not allowed, and
    +              // *groupsWithState after aggregation not allowed
    +              if (m.outputMode == InternalOutputModes.Update) {
    +                throwError(
    +                  "flatMapGroupsWithState in update mode is not supported with " +
    +                    "aggregation on a streaming DataFrame/Dataset")
    +              } else if (collectStreamingAggregates(m).nonEmpty) {
    +                throwError(
    +                  "flatMapGroupsWithState in append mode is not supported after " +
    +                    s"aggregation on a streaming DataFrame/Dataset")
                   }
    +            }
               }
     
    -        // flatMapGroupsWithState(Update) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Update
    -            && collectStreamingAggregates(plan).nonEmpty =>
    -          throwError("flatMapGroupsWithState in update mode is not supported with " +
    -            "aggregation on a streaming DataFrame/Dataset")
    -
    -        // flatMapGroupsWithState(Append) with aggregation
    -        case m: FlatMapGroupsWithState
    -          if m.isStreaming && m.outputMode == InternalOutputModes.Append
    -            && collectStreamingAggregates(m).nonEmpty =>
    -          throwError("flatMapGroupsWithState in append mode is not supported after " +
    -            s"aggregation on a streaming DataFrame/Dataset")
    +          // Check compatibility with timeout configs
    +          if (m.timeout == EventTimeTimeout) {
    +            // With event time timeout, watermark must be defined.
    +            val watermarkAttributes = m.child.output.collect {
    +              case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
    +            }
    +            if (watermarkAttributes.isEmpty) {
    +              throwError(
    +                "Event time timeout is not supported in a [map|flatMap]GroupsWithState " +
    --- End diff --
    
    I think we are hyphenating event-time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17361: [SPARK-20030][SS] Event-time-based timeout for MapGroups...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17361
  
    **[Test build #75010 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75010/testReport)** for PR 17361 at commit [`64b6abf`](https://github.com/apache/spark/commit/64b6abf89189fe8a3f3d2f58f341a9b58a95a6d2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org