You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/03/22 19:28:19 UTC

[2/2] spark git commit: [SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithState

[SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithState

## What changes were proposed in this pull request?

Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs.

## How was this patch tested?
Existing unit tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #17385 from tdas/SPARK-20057.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82b598b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82b598b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82b598b9

Branch: refs/heads/master
Commit: 82b598b963a21ae9d6a2a9638e86b4165c2a78c9
Parents: 80fd070
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Mar 22 12:30:36 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Mar 22 12:30:36 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/streaming/GroupStateTimeout.java  |  54 ++++
 .../spark/sql/streaming/KeyedStateTimeout.java  |  54 ----
 .../sql/catalyst/plans/logical/object.scala     |  18 +-
 .../streaming/JavaGroupStateTimeoutSuite.java   |  33 +++
 .../streaming/JavaKeyedStateTimeoutSuite.java   |  29 --
 .../FlatMapGroupsWithStateFunction.java         |   4 +-
 .../function/MapGroupsWithStateFunction.java    |   4 +-
 .../spark/sql/KeyValueGroupedDataset.scala      |  46 +--
 .../apache/spark/sql/execution/objects.scala    |   8 +-
 .../streaming/FlatMapGroupsWithStateExec.scala  |  16 +-
 .../execution/streaming/GroupStateImpl.scala    | 228 +++++++++++++++
 .../execution/streaming/KeyedStateImpl.scala    | 227 ---------------
 .../execution/streaming/statefulOperators.scala |   4 +-
 .../apache/spark/sql/streaming/GroupState.scala | 285 +++++++++++++++++++
 .../apache/spark/sql/streaming/KeyedState.scala | 285 -------------------
 .../org/apache/spark/sql/JavaDatasetSuite.java  |   4 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala | 122 ++++----
 17 files changed, 713 insertions(+), 708 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
new file mode 100644
index 0000000..bd5e2d7
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.plans.logical.*;
+
+/**
+ * Represents the type of timeouts possible for the Dataset operations
+ * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
+ * `GroupState` for more details.
+ *
+ * @since 2.2.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class GroupStateTimeout {
+
+  /**
+   * Timeout based on processing time. The duration of timeout can be set for each group in
+   * `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutDuration()`. See documentation
+   * on `GroupState` for more details.
+   */
+  public static GroupStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
+
+  /**
+   * Timeout based on event-time. The event-time timestamp for timeout can be set for each
+   * group in `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutTimestamp()`.
+   * In addition, you have to define the watermark in the query using `Dataset.withWatermark`.
+   * When the watermark advances beyond the set timestamp of a group and the group has not
+   * received any data, then the group times out. See documentation on
+   * `GroupState` for more details.
+   */
+  public static GroupStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }
+
+  /** No timeout. */
+  public static GroupStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
deleted file mode 100644
index e2e7ab1..0000000
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import org.apache.spark.annotation.Experimental;
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.plans.logical.*;
-
-/**
- * Represents the type of timeouts possible for the Dataset operations
- * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
- * `KeyedState` for more details.
- *
- * @since 2.2.0
- */
-@Experimental
-@InterfaceStability.Evolving
-public class KeyedStateTimeout {
-
-  /**
-   * Timeout based on processing time. The duration of timeout can be set for each group in
-   * `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutDuration()`. See documentation
-   * on `KeyedState` for more details.
-   */
-  public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
-
-  /**
-   * Timeout based on event-time. The event-time timestamp for timeout can be set for each
-   * group in `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutTimestamp()`.
-   * In addition, you have to define the watermark in the query using `Dataset.withWatermark`.
-   * When the watermark advances beyond the set timestamp of a group and the group has not
-   * received any data, then the group times out. See documentation on
-   * `KeyedState` for more details.
-   */
-  public static KeyedStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }
-
-  /** No timeout. */
-  public static KeyedStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index e0ecf8c..6225b3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.objects.Invoke
-import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode }
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode }
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -351,22 +351,22 @@ case class MapGroups(
     child: LogicalPlan) extends UnaryNode with ObjectProducer
 
 /** Internal class representing State */
-trait LogicalKeyedState[S]
+trait LogicalGroupState[S]
 
 /** Types of timeouts used in FlatMapGroupsWithState */
-case object NoTimeout extends KeyedStateTimeout
-case object ProcessingTimeTimeout extends KeyedStateTimeout
-case object EventTimeTimeout extends KeyedStateTimeout
+case object NoTimeout extends GroupStateTimeout
+case object ProcessingTimeTimeout extends GroupStateTimeout
+case object EventTimeTimeout extends GroupStateTimeout
 
 /** Factory for constructing new `MapGroupsWithState` nodes. */
 object FlatMapGroupsWithState {
   def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
-      func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
+      func: (Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any],
       groupingAttributes: Seq[Attribute],
       dataAttributes: Seq[Attribute],
       outputMode: OutputMode,
       isMapGroupsWithState: Boolean,
-      timeout: KeyedStateTimeout,
+      timeout: GroupStateTimeout,
       child: LogicalPlan): LogicalPlan = {
     val encoder = encoderFor[S]
 
@@ -404,7 +404,7 @@ object FlatMapGroupsWithState {
  * @param timeout used to timeout groups that have not received data in a while
  */
 case class FlatMapGroupsWithState(
-    func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
+    func: (Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any],
     keyDeserializer: Expression,
     valueDeserializer: Expression,
     groupingAttributes: Seq[Attribute],
@@ -413,7 +413,7 @@ case class FlatMapGroupsWithState(
     stateEncoder: ExpressionEncoder[Any],
     outputMode: OutputMode,
     isMapGroupsWithState: Boolean = false,
-    timeout: KeyedStateTimeout,
+    timeout: GroupStateTimeout,
     child: LogicalPlan) extends UnaryNode with ObjectProducer {
 
   if (isMapGroupsWithState) {

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java
new file mode 100644
index 0000000..2e8f2e3
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeTimeout$;
+import org.apache.spark.sql.catalyst.plans.logical.NoTimeout$;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
+import org.junit.Test;
+
+public class JavaGroupStateTimeoutSuite {
+
+  @Test
+  public void testTimeouts() {
+    assert (GroupStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
+    assert (GroupStateTimeout.EventTimeTimeout() == EventTimeTimeout$.MODULE$);
+    assert (GroupStateTimeout.NoTimeout() == NoTimeout$.MODULE$);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
deleted file mode 100644
index 02c94b0..0000000
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
-import org.junit.Test;
-
-public class JavaKeyedStateTimeoutSuite {
-
-  @Test
-  public void testTimeouts() {
-    assert(KeyedStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
index bdda8aa..026b37c 100644
--- a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
 
 import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.streaming.KeyedState;
+import org.apache.spark.sql.streaming.GroupState;
 
 /**
  * ::Experimental::
@@ -35,5 +35,5 @@ import org.apache.spark.sql.streaming.KeyedState;
 @Experimental
 @InterfaceStability.Evolving
 public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
-  Iterator<R> call(K key, Iterator<V> values, KeyedState<S> state) throws Exception;
+  Iterator<R> call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
index 70f3f01..353e988 100644
--- a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
 
 import org.apache.spark.annotation.Experimental;
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.streaming.KeyedState;
+import org.apache.spark.sql.streaming.GroupState;
 
 /**
  * ::Experimental::
@@ -34,5 +34,5 @@ import org.apache.spark.sql.streaming.KeyedState;
 @Experimental
 @InterfaceStability.Evolving
 public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
-  R call(K key, Iterator<V> values, KeyedState<S> state) throws Exception;
+  R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 96437f8..87c5621 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.expressions.ReduceAggregator
-import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout, OutputMode}
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
 
 /**
  * :: Experimental ::
@@ -228,7 +228,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * For a static batch Dataset, the function will be invoked once per group. For a streaming
    * Dataset, the function will be invoked for each group repeatedly in every trigger, and
    * updates to each group's state will be saved across invocations.
-   * See [[org.apache.spark.sql.streaming.KeyedState]] for more details.
+   * See [[org.apache.spark.sql.streaming.GroupState]] for more details.
    *
    * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
    * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -240,17 +240,17 @@ class KeyValueGroupedDataset[K, V] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def mapGroupsWithState[S: Encoder, U: Encoder](
-      func: (K, Iterator[V], KeyedState[S]) => U): Dataset[U] = {
-    val flatMapFunc = (key: K, it: Iterator[V], s: KeyedState[S]) => Iterator(func(key, it, s))
+      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
+    val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
     Dataset[U](
       sparkSession,
       FlatMapGroupsWithState[K, V, S, U](
-        flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any]],
+        flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
         groupingAttributes,
         dataAttributes,
         OutputMode.Update,
         isMapGroupsWithState = true,
-        KeyedStateTimeout.NoTimeout,
+        GroupStateTimeout.NoTimeout,
         child = logicalPlan))
   }
 
@@ -262,7 +262,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * For a static batch Dataset, the function will be invoked once per group. For a streaming
    * Dataset, the function will be invoked for each group repeatedly in every trigger, and
    * updates to each group's state will be saved across invocations.
-   * See [[org.apache.spark.sql.streaming.KeyedState]] for more details.
+   * See [[org.apache.spark.sql.streaming.GroupState]] for more details.
    *
    * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
    * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -275,13 +275,13 @@ class KeyValueGroupedDataset[K, V] private[sql](
   @Experimental
   @InterfaceStability.Evolving
   def mapGroupsWithState[S: Encoder, U: Encoder](
-      timeoutConf: KeyedStateTimeout)(
-      func: (K, Iterator[V], KeyedState[S]) => U): Dataset[U] = {
-    val flatMapFunc = (key: K, it: Iterator[V], s: KeyedState[S]) => Iterator(func(key, it, s))
+      timeoutConf: GroupStateTimeout)(
+      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
+    val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
     Dataset[U](
       sparkSession,
       FlatMapGroupsWithState[K, V, S, U](
-        flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any]],
+        flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
         groupingAttributes,
         dataAttributes,
         OutputMode.Update,
@@ -298,7 +298,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * For a static batch Dataset, the function will be invoked once per group. For a streaming
    * Dataset, the function will be invoked for each group repeatedly in every trigger, and
    * updates to each group's state will be saved across invocations.
-   * See [[KeyedState]] for more details.
+   * See [[GroupState]] for more details.
    *
    * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
    * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -316,7 +316,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
       stateEncoder: Encoder[S],
       outputEncoder: Encoder[U]): Dataset[U] = {
     mapGroupsWithState[S, U](
-      (key: K, it: Iterator[V], s: KeyedState[S]) => func.call(key, it.asJava, s)
+      (key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s)
     )(stateEncoder, outputEncoder)
   }
 
@@ -328,7 +328,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * For a static batch Dataset, the function will be invoked once per group. For a streaming
    * Dataset, the function will be invoked for each group repeatedly in every trigger, and
    * updates to each group's state will be saved across invocations.
-   * See [[KeyedState]] for more details.
+   * See [[GroupState]] for more details.
    *
    * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
    * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -346,9 +346,9 @@ class KeyValueGroupedDataset[K, V] private[sql](
       func: MapGroupsWithStateFunction[K, V, S, U],
       stateEncoder: Encoder[S],
       outputEncoder: Encoder[U],
-      timeoutConf: KeyedStateTimeout): Dataset[U] = {
+      timeoutConf: GroupStateTimeout): Dataset[U] = {
     mapGroupsWithState[S, U](
-      (key: K, it: Iterator[V], s: KeyedState[S]) => func.call(key, it.asJava, s)
+      (key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s)
     )(stateEncoder, outputEncoder)
   }
 
@@ -360,7 +360,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * For a static batch Dataset, the function will be invoked once per group. For a streaming
    * Dataset, the function will be invoked for each group repeatedly in every trigger, and
    * updates to each group's state will be saved across invocations.
-   * See [[KeyedState]] for more details.
+   * See [[GroupState]] for more details.
    *
    * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
    * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -375,15 +375,15 @@ class KeyValueGroupedDataset[K, V] private[sql](
   @InterfaceStability.Evolving
   def flatMapGroupsWithState[S: Encoder, U: Encoder](
       outputMode: OutputMode,
-      timeoutConf: KeyedStateTimeout)(
-      func: (K, Iterator[V], KeyedState[S]) => Iterator[U]): Dataset[U] = {
+      timeoutConf: GroupStateTimeout)(
+      func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] = {
     if (outputMode != OutputMode.Append && outputMode != OutputMode.Update) {
       throw new IllegalArgumentException("The output mode of function should be append or update")
     }
     Dataset[U](
       sparkSession,
       FlatMapGroupsWithState[K, V, S, U](
-        func.asInstanceOf[(Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any]],
+        func.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
         groupingAttributes,
         dataAttributes,
         outputMode,
@@ -400,7 +400,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    * For a static batch Dataset, the function will be invoked once per group. For a streaming
    * Dataset, the function will be invoked for each group repeatedly in every trigger, and
    * updates to each group's state will be saved across invocations.
-   * See [[KeyedState]] for more details.
+   * See [[GroupState]] for more details.
    *
    * @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
    * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -420,8 +420,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
       outputMode: OutputMode,
       stateEncoder: Encoder[S],
       outputEncoder: Encoder[U],
-      timeoutConf: KeyedStateTimeout): Dataset[U] = {
-    val f = (key: K, it: Iterator[V], s: KeyedState[S]) => func.call(key, it.asJava, s).asScala
+      timeoutConf: GroupStateTimeout): Dataset[U] = {
+    val f = (key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s).asScala
     flatMapGroupsWithState[S, U](outputMode, timeoutConf)(f)(stateEncoder, outputEncoder)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index fdd1bcc..48c7b80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.catalyst.expressions.objects.Invoke
 import org.apache.spark.sql.catalyst.plans.logical.FunctionUtils
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
-import org.apache.spark.sql.execution.streaming.KeyedStateImpl
+import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -355,14 +355,14 @@ case class MapGroupsExec(
 
 object MapGroupsExec {
   def apply(
-      func: (Any, Iterator[Any], LogicalKeyedState[Any]) => TraversableOnce[Any],
+      func: (Any, Iterator[Any], LogicalGroupState[Any]) => TraversableOnce[Any],
       keyDeserializer: Expression,
       valueDeserializer: Expression,
       groupingAttributes: Seq[Attribute],
       dataAttributes: Seq[Attribute],
       outputObjAttr: Attribute,
       child: SparkPlan): MapGroupsExec = {
-    val f = (key: Any, values: Iterator[Any]) => func(key, values, new KeyedStateImpl[Any](None))
+    val f = (key: Any, values: Iterator[Any]) => func(key, values, new GroupStateImpl[Any](None))
     new MapGroupsExec(f, keyDeserializer, valueDeserializer,
       groupingAttributes, dataAttributes, outputObjAttr, child)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 52ad70c..c7262ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Attribut
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution}
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.streaming.KeyedStateImpl.NO_TIMESTAMP
+import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
 import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode}
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.util.CompletionIterator
 
@@ -44,7 +44,7 @@ import org.apache.spark.util.CompletionIterator
  * @param batchTimestampMs processing timestamp of the current batch.
  */
 case class FlatMapGroupsWithStateExec(
-    func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
+    func: (Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any],
     keyDeserializer: Expression,
     valueDeserializer: Expression,
     groupingAttributes: Seq[Attribute],
@@ -53,13 +53,13 @@ case class FlatMapGroupsWithStateExec(
     stateId: Option[OperatorStateId],
     stateEncoder: ExpressionEncoder[Any],
     outputMode: OutputMode,
-    timeoutConf: KeyedStateTimeout,
+    timeoutConf: GroupStateTimeout,
     batchTimestampMs: Option[Long],
     override val eventTimeWatermark: Option[Long],
     child: SparkPlan
   ) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter with WatermarkSupport {
 
-  import KeyedStateImpl._
+  import GroupStateImpl._
 
   private val isTimeoutEnabled = timeoutConf != NoTimeout
   private val timestampTimeoutAttribute =
@@ -147,7 +147,7 @@ case class FlatMapGroupsWithStateExec(
     private val stateSerializer = {
       val encoderSerializer = stateEncoder.namedExpressions
       if (isTimeoutEnabled) {
-        encoderSerializer :+ Literal(KeyedStateImpl.NO_TIMESTAMP)
+        encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
       } else {
         encoderSerializer
       }
@@ -211,7 +211,7 @@ case class FlatMapGroupsWithStateExec(
       val keyObj = getKeyObj(keyRow)  // convert key to objects
       val valueObjIter = valueRowIter.map(getValueObj.apply) // convert value rows to objects
       val stateObjOption = getStateObj(prevStateRowOption)
-      val keyedState = new KeyedStateImpl(
+      val keyedState = new GroupStateImpl(
         stateObjOption,
         batchTimestampMs.getOrElse(NO_TIMESTAMP),
         eventTimeWatermark.getOrElse(NO_TIMESTAMP),
@@ -247,7 +247,7 @@ case class FlatMapGroupsWithStateExec(
 
           if (shouldWriteState) {
             if (stateRowToWrite == null) {
-              // This should never happen because checks in KeyedStateImpl should avoid cases
+              // This should never happen because checks in GroupStateImpl should avoid cases
               // where empty state would need to be written
               throw new IllegalStateException("Attempting to write empty state")
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
new file mode 100644
index 0000000..148d922
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.sql.Date
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
+import org.apache.spark.sql.execution.streaming.GroupStateImpl._
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Internal implementation of the [[GroupState]] interface. Methods are not thread-safe.
+ *
+ * @param optionalValue Optional value of the state
+ * @param batchProcessingTimeMs Processing time of current batch, used to calculate timestamp
+ *                              for processing time timeouts
+ * @param timeoutConf     Type of timeout configured. Based on this, different operations will
+ *                        be supported.
+ * @param hasTimedOut     Whether the key for which this state wrapped is being created is
+ *                        getting timed out or not.
+ */
+private[sql] class GroupStateImpl[S](
+    optionalValue: Option[S],
+    batchProcessingTimeMs: Long,
+    eventTimeWatermarkMs: Long,
+    timeoutConf: GroupStateTimeout,
+    override val hasTimedOut: Boolean) extends GroupState[S] {
+
+  // Constructor to create dummy state when using mapGroupsWithState in a batch query
+  def this(optionalValue: Option[S]) = this(
+    optionalValue,
+    batchProcessingTimeMs = NO_TIMESTAMP,
+    eventTimeWatermarkMs = NO_TIMESTAMP,
+    timeoutConf = GroupStateTimeout.NoTimeout,
+    hasTimedOut = false)
+  private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
+  private var defined: Boolean = optionalValue.isDefined
+  private var updated: Boolean = false // whether value has been updated (but not removed)
+  private var removed: Boolean = false // whether value has been removed
+  private var timeoutTimestamp: Long = NO_TIMESTAMP
+
+  // ========= Public API =========
+  override def exists: Boolean = defined
+
+  override def get: S = {
+    if (defined) {
+      value
+    } else {
+      throw new NoSuchElementException("State is either not defined or has already been removed")
+    }
+  }
+
+  override def getOption: Option[S] = {
+    if (defined) {
+      Some(value)
+    } else {
+      None
+    }
+  }
+
+  override def update(newValue: S): Unit = {
+    if (newValue == null) {
+      throw new IllegalArgumentException("'null' is not a valid state value")
+    }
+    value = newValue
+    defined = true
+    updated = true
+    removed = false
+  }
+
+  override def remove(): Unit = {
+    defined = false
+    updated = false
+    removed = true
+    timeoutTimestamp = NO_TIMESTAMP
+  }
+
+  override def setTimeoutDuration(durationMs: Long): Unit = {
+    if (timeoutConf != ProcessingTimeTimeout) {
+      throw new UnsupportedOperationException(
+        "Cannot set timeout duration without enabling processing time timeout in " +
+          "map/flatMapGroupsWithState")
+    }
+    if (!defined) {
+      throw new IllegalStateException(
+        "Cannot set timeout information without any state value, " +
+          "state has either not been initialized, or has already been removed")
+    }
+
+    if (durationMs <= 0) {
+      throw new IllegalArgumentException("Timeout duration must be positive")
+    }
+    if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
+      timeoutTimestamp = durationMs + batchProcessingTimeMs
+    } else {
+      // This is being called in a batch query, hence no processing timestamp.
+      // Just ignore any attempts to set timeout.
+    }
+  }
+
+  override def setTimeoutDuration(duration: String): Unit = {
+    setTimeoutDuration(parseDuration(duration))
+  }
+
+  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  override def setTimeoutTimestamp(timestampMs: Long): Unit = {
+    checkTimeoutTimestampAllowed()
+    if (timestampMs <= 0) {
+      throw new IllegalArgumentException("Timeout timestamp must be positive")
+    }
+    if (eventTimeWatermarkMs != NO_TIMESTAMP && timestampMs < eventTimeWatermarkMs) {
+      throw new IllegalArgumentException(
+        s"Timeout timestamp ($timestampMs) cannot be earlier than the " +
+          s"current watermark ($eventTimeWatermarkMs)")
+    }
+    if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
+      timeoutTimestamp = timestampMs
+    } else {
+      // This is being called in a batch query, hence no processing timestamp.
+      // Just ignore any attempts to set timeout.
+    }
+  }
+
+  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
+    checkTimeoutTimestampAllowed()
+    setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
+  }
+
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  override def setTimeoutTimestamp(timestamp: Date): Unit = {
+    checkTimeoutTimestampAllowed()
+    setTimeoutTimestamp(timestamp.getTime)
+  }
+
+  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
+    checkTimeoutTimestampAllowed()
+    setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
+  }
+
+  override def toString: String = {
+    s"GroupState(${getOption.map(_.toString).getOrElse("<undefined>")})"
+  }
+
+  // ========= Internal API =========
+
+  /** Whether the state has been marked for removing */
+  def hasRemoved: Boolean = removed
+
+  /** Whether the state has been updated */
+  def hasUpdated: Boolean = updated
+
+  /** Return timeout timestamp or `TIMEOUT_TIMESTAMP_NOT_SET` if not set */
+  def getTimeoutTimestamp: Long = timeoutTimestamp
+
+  private def parseDuration(duration: String): Long = {
+    if (StringUtils.isBlank(duration)) {
+      throw new IllegalArgumentException(
+        "Provided duration is null or blank.")
+    }
+    val intervalString = if (duration.startsWith("interval")) {
+      duration
+    } else {
+      "interval " + duration
+    }
+    val cal = CalendarInterval.fromString(intervalString)
+    if (cal == null) {
+      throw new IllegalArgumentException(
+        s"Provided duration ($duration) is not valid.")
+    }
+    if (cal.milliseconds < 0 || cal.months < 0) {
+      throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
+    }
+
+    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+    cal.milliseconds + cal.months * millisPerMonth
+  }
+
+  private def checkTimeoutTimestampAllowed(): Unit = {
+    if (timeoutConf != EventTimeTimeout) {
+      throw new UnsupportedOperationException(
+        "Cannot set timeout timestamp without enabling event time timeout in " +
+          "map/flatMapGroupsWithState")
+    }
+    if (!defined) {
+      throw new IllegalStateException(
+        "Cannot set timeout timestamp without any state value, " +
+          "state has either not been initialized, or has already been removed")
+    }
+  }
+}
+
+
+private[sql] object GroupStateImpl {
+  // Value used represent the lack of valid timestamp as a long
+  val NO_TIMESTAMP = -1L
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
deleted file mode 100644
index edfd35b..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.sql.Date
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
-import org.apache.spark.sql.execution.streaming.KeyedStateImpl._
-import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout}
-import org.apache.spark.unsafe.types.CalendarInterval
-
-
-/**
- * Internal implementation of the [[KeyedState]] interface. Methods are not thread-safe.
- * @param optionalValue Optional value of the state
- * @param batchProcessingTimeMs Processing time of current batch, used to calculate timestamp
- *                              for processing time timeouts
- * @param timeoutConf     Type of timeout configured. Based on this, different operations will
- *                        be supported.
- * @param hasTimedOut     Whether the key for which this state wrapped is being created is
- *                        getting timed out or not.
- */
-private[sql] class KeyedStateImpl[S](
-    optionalValue: Option[S],
-    batchProcessingTimeMs: Long,
-    eventTimeWatermarkMs: Long,
-    timeoutConf: KeyedStateTimeout,
-    override val hasTimedOut: Boolean) extends KeyedState[S] {
-
-  // Constructor to create dummy state when using mapGroupsWithState in a batch query
-  def this(optionalValue: Option[S]) = this(
-    optionalValue,
-    batchProcessingTimeMs = NO_TIMESTAMP,
-    eventTimeWatermarkMs = NO_TIMESTAMP,
-    timeoutConf = KeyedStateTimeout.NoTimeout,
-    hasTimedOut = false)
-  private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
-  private var defined: Boolean = optionalValue.isDefined
-  private var updated: Boolean = false // whether value has been updated (but not removed)
-  private var removed: Boolean = false // whether value has been removed
-  private var timeoutTimestamp: Long = NO_TIMESTAMP
-
-  // ========= Public API =========
-  override def exists: Boolean = defined
-
-  override def get: S = {
-    if (defined) {
-      value
-    } else {
-      throw new NoSuchElementException("State is either not defined or has already been removed")
-    }
-  }
-
-  override def getOption: Option[S] = {
-    if (defined) {
-      Some(value)
-    } else {
-      None
-    }
-  }
-
-  override def update(newValue: S): Unit = {
-    if (newValue == null) {
-      throw new IllegalArgumentException("'null' is not a valid state value")
-    }
-    value = newValue
-    defined = true
-    updated = true
-    removed = false
-  }
-
-  override def remove(): Unit = {
-    defined = false
-    updated = false
-    removed = true
-    timeoutTimestamp = NO_TIMESTAMP
-  }
-
-  override def setTimeoutDuration(durationMs: Long): Unit = {
-    if (timeoutConf != ProcessingTimeTimeout) {
-      throw new UnsupportedOperationException(
-        "Cannot set timeout duration without enabling processing time timeout in " +
-          "map/flatMapGroupsWithState")
-    }
-    if (!defined) {
-      throw new IllegalStateException(
-        "Cannot set timeout information without any state value, " +
-          "state has either not been initialized, or has already been removed")
-    }
-
-    if (durationMs <= 0) {
-      throw new IllegalArgumentException("Timeout duration must be positive")
-    }
-    if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
-      timeoutTimestamp = durationMs + batchProcessingTimeMs
-    } else {
-      // This is being called in a batch query, hence no processing timestamp.
-      // Just ignore any attempts to set timeout.
-    }
-  }
-
-  override def setTimeoutDuration(duration: String): Unit = {
-    setTimeoutDuration(parseDuration(duration))
-  }
-
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  override def setTimeoutTimestamp(timestampMs: Long): Unit = {
-    checkTimeoutTimestampAllowed()
-    if (timestampMs <= 0) {
-      throw new IllegalArgumentException("Timeout timestamp must be positive")
-    }
-    if (eventTimeWatermarkMs != NO_TIMESTAMP && timestampMs < eventTimeWatermarkMs) {
-      throw new IllegalArgumentException(
-        s"Timeout timestamp ($timestampMs) cannot be earlier than the " +
-          s"current watermark ($eventTimeWatermarkMs)")
-    }
-    if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
-      timeoutTimestamp = timestampMs
-    } else {
-      // This is being called in a batch query, hence no processing timestamp.
-      // Just ignore any attempts to set timeout.
-    }
-  }
-
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
-    checkTimeoutTimestampAllowed()
-    setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
-  }
-
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  override def setTimeoutTimestamp(timestamp: Date): Unit = {
-    checkTimeoutTimestampAllowed()
-    setTimeoutTimestamp(timestamp.getTime)
-  }
-
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
-    checkTimeoutTimestampAllowed()
-    setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
-  }
-
-  override def toString: String = {
-    s"KeyedState(${getOption.map(_.toString).getOrElse("<undefined>")})"
-  }
-
-  // ========= Internal API =========
-
-  /** Whether the state has been marked for removing */
-  def hasRemoved: Boolean = removed
-
-  /** Whether the state has been updated */
-  def hasUpdated: Boolean = updated
-
-  /** Return timeout timestamp or `TIMEOUT_TIMESTAMP_NOT_SET` if not set */
-  def getTimeoutTimestamp: Long = timeoutTimestamp
-
-  private def parseDuration(duration: String): Long = {
-    if (StringUtils.isBlank(duration)) {
-      throw new IllegalArgumentException(
-        "Provided duration is null or blank.")
-    }
-    val intervalString = if (duration.startsWith("interval")) {
-      duration
-    } else {
-      "interval " + duration
-    }
-    val cal = CalendarInterval.fromString(intervalString)
-    if (cal == null) {
-      throw new IllegalArgumentException(
-        s"Provided duration ($duration) is not valid.")
-    }
-    if (cal.milliseconds < 0 || cal.months < 0) {
-      throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
-    }
-
-    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
-    cal.milliseconds + cal.months * millisPerMonth
-  }
-
-  private def checkTimeoutTimestampAllowed(): Unit = {
-    if (timeoutConf != EventTimeTimeout) {
-      throw new UnsupportedOperationException(
-        "Cannot set timeout timestamp without enabling event time timeout in " +
-          "map/flatMapGroupsWithState")
-    }
-    if (!defined) {
-      throw new IllegalStateException(
-        "Cannot set timeout timestamp without any state value, " +
-          "state has either not been initialized, or has already been removed")
-    }
-  }
-}
-
-
-private[sql] object KeyedStateImpl {
-  // Value used represent the lack of valid timestamp as a long
-  val NO_TIMESTAMP = -1L
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index f72144a..8dbda29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, Predicate}
-import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalKeyedState, ProcessingTimeTimeout}
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalGroupState, ProcessingTimeTimeout}
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode}
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.CompletionIterator
 

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
new file mode 100644
index 0000000..60a4d0d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.{Encoder, KeyValueGroupedDataset}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with per-group state data in `mapGroupsWithState` and
+ * `flatMapGroupsWithState` operations on [[KeyValueGroupedDataset]].
+ *
+ * Detail description on `[map/flatMap]GroupsWithState` operation
+ * --------------------------------------------------------------
+ * Both, `mapGroupsWithState` and `flatMapGroupsWithState` in [[KeyValueGroupedDataset]]
+ * will invoke the user-given function on each group (defined by the grouping function in
+ * `Dataset.groupByKey()`) while maintaining user-defined per-group state between invocations.
+ * For a static batch Dataset, the function will be invoked once per group. For a streaming
+ * Dataset, the function will be invoked for each group repeatedly in every trigger.
+ * That is, in every batch of the `streaming.StreamingQuery`,
+ * the function will be invoked once for each group that has data in the trigger. Furthermore,
+ * if timeout is set, then the function will invoked on timed out groups (more detail below).
+ *
+ * The function is invoked with following parameters.
+ *  - The key of the group.
+ *  - An iterator containing all the values for this group.
+ *  - A user-defined state object set by previous invocations of the given function.
+ * In case of a batch Dataset, there is only one invocation and state object will be empty as
+ * there is no prior state. Essentially, for batch Datasets, `[map/flatMap]GroupsWithState`
+ * is equivalent to `[map/flatMap]Groups` and any updates to the state and/or timeouts have
+ * no effect.
+ *
+ * Important points to note about the function.
+ *  - In a trigger, the function will be called only the groups present in the batch. So do not
+ *    assume that the function will be called in every trigger for every group that has state.
+ *  - There is no guaranteed ordering of values in the iterator in the function, neither with
+ *    batch, nor with streaming Datasets.
+ *  - All the data will be shuffled before applying the function.
+ *  - If timeout is set, then the function will also be called with no values.
+ *    See more details on `GroupStateTimeout` below.
+ *
+ * Important points to note about using `GroupState`.
+ *  - The value of the state cannot be null. So updating state with null will throw
+ *    `IllegalArgumentException`.
+ *  - Operations on `GroupState` are not thread-safe. This is to avoid memory barriers.
+ *  - If `remove()` is called, then `exists()` will return `false`,
+ *    `get()` will throw `NoSuchElementException` and `getOption()` will return `None`
+ *  - After that, if `update(newState)` is called, then `exists()` will again return `true`,
+ *    `get()` and `getOption()`will return the updated value.
+ *
+ * Important points to note about using `GroupStateTimeout`.
+ *  - The timeout type is a global param across all the groups (set as `timeout` param in
+ *    `[map|flatMap]GroupsWithState`, but the exact timeout duration/timestamp is configurable per
+ *    group by calling `setTimeout...()` in `GroupState`.
+ *  - Timeouts can be either based on processing time (i.e.
+ *    [[GroupStateTimeout.ProcessingTimeTimeout]]) or event time (i.e.
+ *    [[GroupStateTimeout.EventTimeTimeout]]).
+ *  - With `ProcessingTimeTimeout`, the timeout duration can be set by calling
+ *    `GroupState.setTimeoutDuration`. The timeout will occur when the clock has advanced by the set
+ *    duration. Guarantees provided by this timeout with a duration of D ms are as follows:
+ *    - Timeout will never be occur before the clock time has advanced by D ms
+ *    - Timeout will occur eventually when there is a trigger in the query
+ *      (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
+ *      For example, the trigger interval of the query will affect when the timeout actually occurs.
+ *      If there is no data in the stream (for any group) for a while, then their will not be
+ *      any trigger and timeout function call will not occur until there is data.
+ *    - Since the processing time timeout is based on the clock time, it is affected by the
+ *      variations in the system clock (i.e. time zone changes, clock skew, etc.).
+ *  - With `EventTimeTimeout`, the user also has to specify the the the event time watermark in
+ *    the query using `Dataset.withWatermark()`. With this setting, data that is older than the
+ *    watermark are filtered out. The timeout can be set for a group by setting a timeout timestamp
+ *    using`GroupState.setTimeoutTimestamp()`, and the timeout would occur when the watermark
+ *    advances beyond the set timestamp. You can control the timeout delay by two parameters -
+ *    (i) watermark delay and an additional duration beyond the timestamp in the event (which
+ *    is guaranteed to be newer than watermark due to the filtering). Guarantees provided by this
+ *    timeout are as follows:
+ *    - Timeout will never be occur before watermark has exceeded the set timeout.
+ *    - Similar to processing time timeouts, there is a no strict upper bound on the delay when
+ *      the timeout actually occurs. The watermark can advance only when there is data in the
+ *      stream, and the event time of the data has actually advanced.
+ *  - When the timeout occurs for a group, the function is called for that group with no values, and
+ *    `GroupState.hasTimedOut()` set to true.
+ *  - The timeout is reset every time the function is called on a group, that is,
+ *    when the group has new data, or the group has timed out. So the user has to set the timeout
+ *    duration every time the function is called, otherwise there will not be any timeout set.
+ *
+ * Scala example of using GroupState in `mapGroupsWithState`:
+ * {{{
+ * // A mapping function that maintains an integer state for string keys and returns a string.
+ * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
+ * def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {
+ *
+ *   if (state.hasTimedOut) {                // If called when timing out, remove the state
+ *     state.remove()
+ *
+ *   } else if (state.exists) {              // If state exists, use it for processing
+ *     val existingState = state.get         // Get the existing state
+ *     val shouldRemove = ...                // Decide whether to remove the state
+ *     if (shouldRemove) {
+ *       state.remove()                      // Remove the state
+ *
+ *     } else {
+ *       val newState = ...
+ *       state.update(newState)              // Set the new state
+ *       state.setTimeoutDuration("1 hour")  // Set the timeout
+ *     }
+ *
+ *   } else {
+ *     val initialState = ...
+ *     state.update(initialState)            // Set the initial state
+ *     state.setTimeoutDuration("1 hour")    // Set the timeout
+ *   }
+ *   ...
+ *   // return something
+ * }
+ *
+ * dataset
+ *   .groupByKey(...)
+ *   .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)
+ * }}}
+ *
+ * Java example of using `GroupState`:
+ * {{{
+ * // A mapping function that maintains an integer state for string keys and returns a string.
+ * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
+ * MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
+ *    new MapGroupsWithStateFunction<String, Integer, Integer, String>() {
+ *
+ *      @Override
+ *      public String call(String key, Iterator<Integer> value, GroupState<Integer> state) {
+ *        if (state.hasTimedOut()) {            // If called when timing out, remove the state
+ *          state.remove();
+ *
+ *        } else if (state.exists()) {            // If state exists, use it for processing
+ *          int existingState = state.get();      // Get the existing state
+ *          boolean shouldRemove = ...;           // Decide whether to remove the state
+ *          if (shouldRemove) {
+ *            state.remove();                     // Remove the state
+ *
+ *          } else {
+ *            int newState = ...;
+ *            state.update(newState);             // Set the new state
+ *            state.setTimeoutDuration("1 hour"); // Set the timeout
+ *          }
+ *
+ *        } else {
+ *          int initialState = ...;               // Set the initial state
+ *          state.update(initialState);
+ *          state.setTimeoutDuration("1 hour");   // Set the timeout
+ *        }
+ *        ...
+*         // return something
+ *      }
+ *    };
+ *
+ * dataset
+ *     .groupByKey(...)
+ *     .mapGroupsWithState(
+ *         mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);
+ * }}}
+ *
+ * @tparam S User-defined type of the state to be stored for each group. Must be encodable into
+ *           Spark SQL types (see [[Encoder]] for more details).
+ * @since 2.2.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait GroupState[S] extends LogicalGroupState[S] {
+
+  /** Whether state exists or not. */
+  def exists: Boolean
+
+  /** Get the state value if it exists, or throw NoSuchElementException. */
+  @throws[NoSuchElementException]("when state does not exist")
+  def get: S
+
+  /** Get the state value as a scala Option. */
+  def getOption: Option[S]
+
+  /**
+   * Update the value of the state. Note that `null` is not a valid value, and it throws
+   * IllegalArgumentException.
+   */
+  @throws[IllegalArgumentException]("when updating with null")
+  def update(newState: S): Unit
+
+  /** Remove this state. Note that this resets any timeout configuration as well. */
+  def remove(): Unit
+
+  /**
+   * Whether the function has been called because the key has timed out.
+   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
+   */
+  def hasTimedOut: Boolean
+
+  /**
+   * Set the timeout duration in ms for this key.
+   *
+   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+   */
+  @throws[IllegalArgumentException]("if 'durationMs' is not positive")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  def setTimeoutDuration(durationMs: Long): Unit
+
+  /**
+   * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
+   *
+   * @note, ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+   */
+  @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  def setTimeoutDuration(duration: String): Unit
+
+  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  /**
+   * Set the timeout timestamp for this key as milliseconds in epoch time.
+   * This timestamp cannot be older than the current watermark.
+   *
+   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+   */
+  def setTimeoutTimestamp(timestampMs: Long): Unit
+
+  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  /**
+   * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
+   * duration as a string (e.g. "1 hour", "2 days", etc.).
+   * The final timestamp (including the additional duration) cannot be older than the
+   * current watermark.
+   *
+   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+   */
+  def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
+
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  /**
+   * Set the timeout timestamp for this key as a java.sql.Date.
+   * This timestamp cannot be older than the current watermark.
+   *
+   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+   */
+  def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
+
+  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+  @throws[IllegalStateException]("when state is either not initialized, or already removed")
+  @throws[UnsupportedOperationException](
+    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+  /**
+   * Set the timeout timestamp for this key as a java.sql.Date and an additional
+   * duration as a string (e.g. "1 hour", "2 days", etc.).
+   * The final timestamp (including the additional duration) cannot be older than the
+   * current watermark.
+   *
+   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+   */
+  def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
deleted file mode 100644
index 461de04..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.{Experimental, InterfaceStability}
-import org.apache.spark.sql.{Encoder, KeyValueGroupedDataset}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
-
-/**
- * :: Experimental ::
- *
- * Wrapper class for interacting with keyed state data in `mapGroupsWithState` and
- * `flatMapGroupsWithState` operations on
- * [[KeyValueGroupedDataset]].
- *
- * Detail description on `[map/flatMap]GroupsWithState` operation
- * --------------------------------------------------------------
- * Both, `mapGroupsWithState` and `flatMapGroupsWithState` in [[KeyValueGroupedDataset]]
- * will invoke the user-given function on each group (defined by the grouping function in
- * `Dataset.groupByKey()`) while maintaining user-defined per-group state between invocations.
- * For a static batch Dataset, the function will be invoked once per group. For a streaming
- * Dataset, the function will be invoked for each group repeatedly in every trigger.
- * That is, in every batch of the `streaming.StreamingQuery`,
- * the function will be invoked once for each group that has data in the trigger. Furthermore,
- * if timeout is set, then the function will invoked on timed out keys (more detail below).
- *
- * The function is invoked with following parameters.
- *  - The key of the group.
- *  - An iterator containing all the values for this key.
- *  - A user-defined state object set by previous invocations of the given function.
- * In case of a batch Dataset, there is only one invocation and state object will be empty as
- * there is no prior state. Essentially, for batch Datasets, `[map/flatMap]GroupsWithState`
- * is equivalent to `[map/flatMap]Groups` and any updates to the state and/or timeouts have
- * no effect.
- *
- * Important points to note about the function.
- *  - In a trigger, the function will be called only the groups present in the batch. So do not
- *    assume that the function will be called in every trigger for every group that has state.
- *  - There is no guaranteed ordering of values in the iterator in the function, neither with
- *    batch, nor with streaming Datasets.
- *  - All the data will be shuffled before applying the function.
- *  - If timeout is set, then the function will also be called with no values.
- *    See more details on `KeyedStateTimeout` below.
- *
- * Important points to note about using `KeyedState`.
- *  - The value of the state cannot be null. So updating state with null will throw
- *    `IllegalArgumentException`.
- *  - Operations on `KeyedState` are not thread-safe. This is to avoid memory barriers.
- *  - If `remove()` is called, then `exists()` will return `false`,
- *    `get()` will throw `NoSuchElementException` and `getOption()` will return `None`
- *  - After that, if `update(newState)` is called, then `exists()` will again return `true`,
- *    `get()` and `getOption()`will return the updated value.
- *
- * Important points to note about using `KeyedStateTimeout`.
- *  - The timeout type is a global param across all the keys (set as `timeout` param in
- *    `[map|flatMap]GroupsWithState`, but the exact timeout duration/timestamp is configurable per
- *    key by calling `setTimeout...()` in `KeyedState`.
- *  - Timeouts can be either based on processing time (i.e.
- *    [[KeyedStateTimeout.ProcessingTimeTimeout]]) or event time (i.e.
- *    [[KeyedStateTimeout.EventTimeTimeout]]).
- *  - With `ProcessingTimeTimeout`, the timeout duration can be set by calling
- *    `KeyedState.setTimeoutDuration`. The timeout will occur when the clock has advanced by the set
- *    duration. Guarantees provided by this timeout with a duration of D ms are as follows:
- *    - Timeout will never be occur before the clock time has advanced by D ms
- *    - Timeout will occur eventually when there is a trigger in the query
- *      (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
- *      For example, the trigger interval of the query will affect when the timeout actually occurs.
- *      If there is no data in the stream (for any key) for a while, then their will not be
- *      any trigger and timeout function call will not occur until there is data.
- *    - Since the processing time timeout is based on the clock time, it is affected by the
- *      variations in the system clock (i.e. time zone changes, clock skew, etc.).
- *  - With `EventTimeTimeout`, the user also has to specify the the the event time watermark in
- *    the query using `Dataset.withWatermark()`. With this setting, data that is older than the
- *    watermark are filtered out. The timeout can be enabled for a key by setting a timestamp using
- *    `KeyedState.setTimeoutTimestamp()`, and the timeout would occur when the watermark advances
- *    beyond the set timestamp. You can control the timeout delay by two parameters - (i) watermark
- *    delay and an additional duration beyond the timestamp in the event (which is guaranteed to
- *    > watermark due to the filtering). Guarantees provided by this timeout are as follows:
- *    - Timeout will never be occur before watermark has exceeded the set timeout.
- *    - Similar to processing time timeouts, there is a no strict upper bound on the delay when
- *      the timeout actually occurs. The watermark can advance only when there is data in the
- *      stream, and the event time of the data has actually advanced.
- *  - When the timeout occurs for a key, the function is called for that key with no values, and
- *    `KeyedState.hasTimedOut()` set to true.
- *  - The timeout is reset for key every time the function is called on the key, that is,
- *    when the key has new data, or the key has timed out. So the user has to set the timeout
- *    duration every time the function is called, otherwise there will not be any timeout set.
- *
- * Scala example of using KeyedState in `mapGroupsWithState`:
- * {{{
- * // A mapping function that maintains an integer state for string keys and returns a string.
- * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
- * def mappingFunction(key: String, value: Iterator[Int], state: KeyedState[Int]): String = {
- *
- *   if (state.hasTimedOut) {                // If called when timing out, remove the state
- *     state.remove()
- *
- *   } else if (state.exists) {              // If state exists, use it for processing
- *     val existingState = state.get         // Get the existing state
- *     val shouldRemove = ...                // Decide whether to remove the state
- *     if (shouldRemove) {
- *       state.remove()                      // Remove the state
- *
- *     } else {
- *       val newState = ...
- *       state.update(newState)              // Set the new state
- *       state.setTimeoutDuration("1 hour")  // Set the timeout
- *     }
- *
- *   } else {
- *     val initialState = ...
- *     state.update(initialState)            // Set the initial state
- *     state.setTimeoutDuration("1 hour")    // Set the timeout
- *   }
- *   ...
- *   // return something
- * }
- *
- * dataset
- *   .groupByKey(...)
- *   .mapGroupsWithState(KeyedStateTimeout.ProcessingTimeTimeout)(mappingFunction)
- * }}}
- *
- * Java example of using `KeyedState`:
- * {{{
- * // A mapping function that maintains an integer state for string keys and returns a string.
- * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
- * MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
- *    new MapGroupsWithStateFunction<String, Integer, Integer, String>() {
- *
- *      @Override
- *      public String call(String key, Iterator<Integer> value, KeyedState<Integer> state) {
- *        if (state.hasTimedOut()) {            // If called when timing out, remove the state
- *          state.remove();
- *
- *        } else if (state.exists()) {            // If state exists, use it for processing
- *          int existingState = state.get();      // Get the existing state
- *          boolean shouldRemove = ...;           // Decide whether to remove the state
- *          if (shouldRemove) {
- *            state.remove();                     // Remove the state
- *
- *          } else {
- *            int newState = ...;
- *            state.update(newState);             // Set the new state
- *            state.setTimeoutDuration("1 hour"); // Set the timeout
- *          }
- *
- *        } else {
- *          int initialState = ...;               // Set the initial state
- *          state.update(initialState);
- *          state.setTimeoutDuration("1 hour");   // Set the timeout
- *        }
- *        ...
-*         // return something
- *      }
- *    };
- *
- * dataset
- *     .groupByKey(...)
- *     .mapGroupsWithState(
- *         mappingFunction, Encoders.INT, Encoders.STRING, KeyedStateTimeout.ProcessingTimeTimeout);
- * }}}
- *
- * @tparam S User-defined type of the state to be stored for each key. Must be encodable into
- *           Spark SQL types (see [[Encoder]] for more details).
- * @since 2.2.0
- */
-@Experimental
-@InterfaceStability.Evolving
-trait KeyedState[S] extends LogicalKeyedState[S] {
-
-  /** Whether state exists or not. */
-  def exists: Boolean
-
-  /** Get the state value if it exists, or throw NoSuchElementException. */
-  @throws[NoSuchElementException]("when state does not exist")
-  def get: S
-
-  /** Get the state value as a scala Option. */
-  def getOption: Option[S]
-
-  /**
-   * Update the value of the state. Note that `null` is not a valid value, and it throws
-   * IllegalArgumentException.
-   */
-  @throws[IllegalArgumentException]("when updating with null")
-  def update(newState: S): Unit
-
-  /** Remove this keyed state. Note that this resets any timeout configuration as well. */
-  def remove(): Unit
-
-  /**
-   * Whether the function has been called because the key has timed out.
-   * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
-   */
-  def hasTimedOut: Boolean
-
-  /**
-   * Set the timeout duration in ms for this key.
-   *
-   * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
-   */
-  @throws[IllegalArgumentException]("if 'durationMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  def setTimeoutDuration(durationMs: Long): Unit
-
-  /**
-   * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
-   *
-   * @note, ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
-   */
-  @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  def setTimeoutDuration(duration: String): Unit
-
-  @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  /**
-   * Set the timeout timestamp for this key as milliseconds in epoch time.
-   * This timestamp cannot be older than the current watermark.
-   *
-   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
-   */
-  def setTimeoutTimestamp(timestampMs: Long): Unit
-
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  /**
-   * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
-   * duration as a string (e.g. "1 hour", "2 days", etc.).
-   * The final timestamp (including the additional duration) cannot be older than the
-   * current watermark.
-   *
-   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
-   */
-  def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
-
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  /**
-   * Set the timeout timestamp for this key as a java.sql.Date.
-   * This timestamp cannot be older than the current watermark.
-   *
-   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
-   */
-  def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
-
-  @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
-  @throws[IllegalStateException]("when state is either not initialized, or already removed")
-  @throws[UnsupportedOperationException](
-    "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
-  /**
-   * Set the timeout timestamp for this key as a java.sql.Date and an additional
-   * duration as a string (e.g. "1 hour", "2 days", etc.).
-   * The final timestamp (including the additional duration) cannot be older than the
-   * current watermark.
-   *
-   * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
-   */
-  def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index ffb4c62..78cf033 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -23,7 +23,7 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.*;
 
-import org.apache.spark.sql.streaming.KeyedStateTimeout;
+import org.apache.spark.sql.streaming.GroupStateTimeout;
 import org.apache.spark.sql.streaming.OutputMode;
 import scala.Tuple2;
 import scala.Tuple3;
@@ -210,7 +210,7 @@ public class JavaDatasetSuite implements Serializable {
       OutputMode.Append(),
       Encoders.LONG(),
       Encoders.STRING(),
-      KeyedStateTimeout.NoTimeout());
+      GroupStateTimeout.NoTimeout());
 
     Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped2.collectAsList()));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org