You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/03/22 19:28:19 UTC
[2/2] spark git commit: [SPARK-20057][SS] Renamed KeyedState to
GroupState in mapGroupsWithState
[SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithState
## What changes were proposed in this pull request?
Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs.
## How was this patch tested?
Existing unit tests.
Author: Tathagata Das <ta...@gmail.com>
Closes #17385 from tdas/SPARK-20057.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82b598b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82b598b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82b598b9
Branch: refs/heads/master
Commit: 82b598b963a21ae9d6a2a9638e86b4165c2a78c9
Parents: 80fd070
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Mar 22 12:30:36 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Mar 22 12:30:36 2017 -0700
----------------------------------------------------------------------
.../spark/sql/streaming/GroupStateTimeout.java | 54 ++++
.../spark/sql/streaming/KeyedStateTimeout.java | 54 ----
.../sql/catalyst/plans/logical/object.scala | 18 +-
.../streaming/JavaGroupStateTimeoutSuite.java | 33 +++
.../streaming/JavaKeyedStateTimeoutSuite.java | 29 --
.../FlatMapGroupsWithStateFunction.java | 4 +-
.../function/MapGroupsWithStateFunction.java | 4 +-
.../spark/sql/KeyValueGroupedDataset.scala | 46 +--
.../apache/spark/sql/execution/objects.scala | 8 +-
.../streaming/FlatMapGroupsWithStateExec.scala | 16 +-
.../execution/streaming/GroupStateImpl.scala | 228 +++++++++++++++
.../execution/streaming/KeyedStateImpl.scala | 227 ---------------
.../execution/streaming/statefulOperators.scala | 4 +-
.../apache/spark/sql/streaming/GroupState.scala | 285 +++++++++++++++++++
.../apache/spark/sql/streaming/KeyedState.scala | 285 -------------------
.../org/apache/spark/sql/JavaDatasetSuite.java | 4 +-
.../streaming/FlatMapGroupsWithStateSuite.scala | 122 ++++----
17 files changed, 713 insertions(+), 708 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
new file mode 100644
index 0000000..bd5e2d7
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.catalyst.plans.logical.*;
+
+/**
+ * Represents the type of timeouts possible for the Dataset operations
+ * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
+ * `GroupState` for more details.
+ *
+ * @since 2.2.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+public class GroupStateTimeout {
+
+ /**
+ * Timeout based on processing time. The duration of timeout can be set for each group in
+ * `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutDuration()`. See documentation
+ * on `GroupState` for more details.
+ */
+ public static GroupStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
+
+ /**
+ * Timeout based on event-time. The event-time timestamp for timeout can be set for each
+ * group in `map/flatMapGroupsWithState` by calling `GroupState.setTimeoutTimestamp()`.
+ * In addition, you have to define the watermark in the query using `Dataset.withWatermark`.
+ * When the watermark advances beyond the set timestamp of a group and the group has not
+ * received any data, then the group times out. See documentation on
+ * `GroupState` for more details.
+ */
+ public static GroupStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }
+
+ /** No timeout. */
+ public static GroupStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
deleted file mode 100644
index e2e7ab1..0000000
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/KeyedStateTimeout.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import org.apache.spark.annotation.Experimental;
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.plans.logical.*;
-
-/**
- * Represents the type of timeouts possible for the Dataset operations
- * `mapGroupsWithState` and `flatMapGroupsWithState`. See documentation on
- * `KeyedState` for more details.
- *
- * @since 2.2.0
- */
-@Experimental
-@InterfaceStability.Evolving
-public class KeyedStateTimeout {
-
- /**
- * Timeout based on processing time. The duration of timeout can be set for each group in
- * `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutDuration()`. See documentation
- * on `KeyedState` for more details.
- */
- public static KeyedStateTimeout ProcessingTimeTimeout() { return ProcessingTimeTimeout$.MODULE$; }
-
- /**
- * Timeout based on event-time. The event-time timestamp for timeout can be set for each
- * group in `map/flatMapGroupsWithState` by calling `KeyedState.setTimeoutTimestamp()`.
- * In addition, you have to define the watermark in the query using `Dataset.withWatermark`.
- * When the watermark advances beyond the set timestamp of a group and the group has not
- * received any data, then the group times out. See documentation on
- * `KeyedState` for more details.
- */
- public static KeyedStateTimeout EventTimeTimeout() { return EventTimeTimeout$.MODULE$; }
-
- /** No timeout. */
- public static KeyedStateTimeout NoTimeout() { return NoTimeout$.MODULE$; }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index e0ecf8c..6225b3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.Invoke
-import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode }
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode }
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -351,22 +351,22 @@ case class MapGroups(
child: LogicalPlan) extends UnaryNode with ObjectProducer
/** Internal class representing State */
-trait LogicalKeyedState[S]
+trait LogicalGroupState[S]
/** Types of timeouts used in FlatMapGroupsWithState */
-case object NoTimeout extends KeyedStateTimeout
-case object ProcessingTimeTimeout extends KeyedStateTimeout
-case object EventTimeTimeout extends KeyedStateTimeout
+case object NoTimeout extends GroupStateTimeout
+case object ProcessingTimeTimeout extends GroupStateTimeout
+case object EventTimeTimeout extends GroupStateTimeout
/** Factory for constructing new `MapGroupsWithState` nodes. */
object FlatMapGroupsWithState {
def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
- func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
+ func: (Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any],
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
outputMode: OutputMode,
isMapGroupsWithState: Boolean,
- timeout: KeyedStateTimeout,
+ timeout: GroupStateTimeout,
child: LogicalPlan): LogicalPlan = {
val encoder = encoderFor[S]
@@ -404,7 +404,7 @@ object FlatMapGroupsWithState {
* @param timeout used to timeout groups that have not received data in a while
*/
case class FlatMapGroupsWithState(
- func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
+ func: (Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
@@ -413,7 +413,7 @@ case class FlatMapGroupsWithState(
stateEncoder: ExpressionEncoder[Any],
outputMode: OutputMode,
isMapGroupsWithState: Boolean = false,
- timeout: KeyedStateTimeout,
+ timeout: GroupStateTimeout,
child: LogicalPlan) extends UnaryNode with ObjectProducer {
if (isMapGroupsWithState) {
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java
new file mode 100644
index 0000000..2e8f2e3
--- /dev/null
+++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaGroupStateTimeoutSuite.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeTimeout$;
+import org.apache.spark.sql.catalyst.plans.logical.NoTimeout$;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
+import org.junit.Test;
+
+public class JavaGroupStateTimeoutSuite {
+
+ @Test
+ public void testTimeouts() {
+ assert (GroupStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
+ assert (GroupStateTimeout.EventTimeTimeout() == EventTimeTimeout$.MODULE$);
+ assert (GroupStateTimeout.NoTimeout() == NoTimeout$.MODULE$);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
deleted file mode 100644
index 02c94b0..0000000
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaKeyedStateTimeoutSuite.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import org.apache.spark.sql.catalyst.plans.logical.ProcessingTimeTimeout$;
-import org.junit.Test;
-
-public class JavaKeyedStateTimeoutSuite {
-
- @Test
- public void testTimeouts() {
- assert(KeyedStateTimeout.ProcessingTimeTimeout() == ProcessingTimeTimeout$.MODULE$);
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
index bdda8aa..026b37c 100644
--- a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.streaming.KeyedState;
+import org.apache.spark.sql.streaming.GroupState;
/**
* ::Experimental::
@@ -35,5 +35,5 @@ import org.apache.spark.sql.streaming.KeyedState;
@Experimental
@InterfaceStability.Evolving
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
- Iterator<R> call(K key, Iterator<V> values, KeyedState<S> state) throws Exception;
+ Iterator<R> call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
index 70f3f01..353e988 100644
--- a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
@@ -22,7 +22,7 @@ import java.util.Iterator;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.streaming.KeyedState;
+import org.apache.spark.sql.streaming.GroupState;
/**
* ::Experimental::
@@ -34,5 +34,5 @@ import org.apache.spark.sql.streaming.KeyedState;
@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
- R call(K key, Iterator<V> values, KeyedState<S> state) throws Exception;
+ R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 96437f8..87c5621 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.expressions.ReduceAggregator
-import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout, OutputMode}
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
/**
* :: Experimental ::
@@ -228,7 +228,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
- * See [[org.apache.spark.sql.streaming.KeyedState]] for more details.
+ * See [[org.apache.spark.sql.streaming.GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -240,17 +240,17 @@ class KeyValueGroupedDataset[K, V] private[sql](
@Experimental
@InterfaceStability.Evolving
def mapGroupsWithState[S: Encoder, U: Encoder](
- func: (K, Iterator[V], KeyedState[S]) => U): Dataset[U] = {
- val flatMapFunc = (key: K, it: Iterator[V], s: KeyedState[S]) => Iterator(func(key, it, s))
+ func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
+ val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
Dataset[U](
sparkSession,
FlatMapGroupsWithState[K, V, S, U](
- flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any]],
+ flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
groupingAttributes,
dataAttributes,
OutputMode.Update,
isMapGroupsWithState = true,
- KeyedStateTimeout.NoTimeout,
+ GroupStateTimeout.NoTimeout,
child = logicalPlan))
}
@@ -262,7 +262,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
- * See [[org.apache.spark.sql.streaming.KeyedState]] for more details.
+ * See [[org.apache.spark.sql.streaming.GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -275,13 +275,13 @@ class KeyValueGroupedDataset[K, V] private[sql](
@Experimental
@InterfaceStability.Evolving
def mapGroupsWithState[S: Encoder, U: Encoder](
- timeoutConf: KeyedStateTimeout)(
- func: (K, Iterator[V], KeyedState[S]) => U): Dataset[U] = {
- val flatMapFunc = (key: K, it: Iterator[V], s: KeyedState[S]) => Iterator(func(key, it, s))
+ timeoutConf: GroupStateTimeout)(
+ func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
+ val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
Dataset[U](
sparkSession,
FlatMapGroupsWithState[K, V, S, U](
- flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any]],
+ flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
groupingAttributes,
dataAttributes,
OutputMode.Update,
@@ -298,7 +298,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
- * See [[KeyedState]] for more details.
+ * See [[GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -316,7 +316,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
stateEncoder: Encoder[S],
outputEncoder: Encoder[U]): Dataset[U] = {
mapGroupsWithState[S, U](
- (key: K, it: Iterator[V], s: KeyedState[S]) => func.call(key, it.asJava, s)
+ (key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s)
)(stateEncoder, outputEncoder)
}
@@ -328,7 +328,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
- * See [[KeyedState]] for more details.
+ * See [[GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -346,9 +346,9 @@ class KeyValueGroupedDataset[K, V] private[sql](
func: MapGroupsWithStateFunction[K, V, S, U],
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
- timeoutConf: KeyedStateTimeout): Dataset[U] = {
+ timeoutConf: GroupStateTimeout): Dataset[U] = {
mapGroupsWithState[S, U](
- (key: K, it: Iterator[V], s: KeyedState[S]) => func.call(key, it.asJava, s)
+ (key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s)
)(stateEncoder, outputEncoder)
}
@@ -360,7 +360,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
- * See [[KeyedState]] for more details.
+ * See [[GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -375,15 +375,15 @@ class KeyValueGroupedDataset[K, V] private[sql](
@InterfaceStability.Evolving
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
- timeoutConf: KeyedStateTimeout)(
- func: (K, Iterator[V], KeyedState[S]) => Iterator[U]): Dataset[U] = {
+ timeoutConf: GroupStateTimeout)(
+ func: (K, Iterator[V], GroupState[S]) => Iterator[U]): Dataset[U] = {
if (outputMode != OutputMode.Append && outputMode != OutputMode.Update) {
throw new IllegalArgumentException("The output mode of function should be append or update")
}
Dataset[U](
sparkSession,
FlatMapGroupsWithState[K, V, S, U](
- func.asInstanceOf[(Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any]],
+ func.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
groupingAttributes,
dataAttributes,
outputMode,
@@ -400,7 +400,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger, and
* updates to each group's state will be saved across invocations.
- * See [[KeyedState]] for more details.
+ * See [[GroupState]] for more details.
*
* @tparam S The type of the user-defined state. Must be encodable to Spark SQL types.
* @tparam U The type of the output objects. Must be encodable to Spark SQL types.
@@ -420,8 +420,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
outputMode: OutputMode,
stateEncoder: Encoder[S],
outputEncoder: Encoder[U],
- timeoutConf: KeyedStateTimeout): Dataset[U] = {
- val f = (key: K, it: Iterator[V], s: KeyedState[S]) => func.call(key, it.asJava, s).asScala
+ timeoutConf: GroupStateTimeout): Dataset[U] = {
+ val f = (key: K, it: Iterator[V], s: GroupState[S]) => func.call(key, it.asJava, s).asScala
flatMapGroupsWithState[S, U](outputMode, timeoutConf)(f)(stateEncoder, outputEncoder)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index fdd1bcc..48c7b80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.catalyst.plans.logical.FunctionUtils
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
-import org.apache.spark.sql.execution.streaming.KeyedStateImpl
+import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -355,14 +355,14 @@ case class MapGroupsExec(
object MapGroupsExec {
def apply(
- func: (Any, Iterator[Any], LogicalKeyedState[Any]) => TraversableOnce[Any],
+ func: (Any, Iterator[Any], LogicalGroupState[Any]) => TraversableOnce[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
outputObjAttr: Attribute,
child: SparkPlan): MapGroupsExec = {
- val f = (key: Any, values: Iterator[Any]) => func(key, values, new KeyedStateImpl[Any](None))
+ val f = (key: Any, values: Iterator[Any]) => func(key, values, new GroupStateImpl[Any](None))
new MapGroupsExec(f, keyDeserializer, valueDeserializer,
groupingAttributes, dataAttributes, outputObjAttr, child)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 52ad70c..c7262ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Attribut
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution}
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.streaming.KeyedStateImpl.NO_TIMESTAMP
+import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode}
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.util.CompletionIterator
@@ -44,7 +44,7 @@ import org.apache.spark.util.CompletionIterator
* @param batchTimestampMs processing timestamp of the current batch.
*/
case class FlatMapGroupsWithStateExec(
- func: (Any, Iterator[Any], LogicalKeyedState[Any]) => Iterator[Any],
+ func: (Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any],
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
@@ -53,13 +53,13 @@ case class FlatMapGroupsWithStateExec(
stateId: Option[OperatorStateId],
stateEncoder: ExpressionEncoder[Any],
outputMode: OutputMode,
- timeoutConf: KeyedStateTimeout,
+ timeoutConf: GroupStateTimeout,
batchTimestampMs: Option[Long],
override val eventTimeWatermark: Option[Long],
child: SparkPlan
) extends UnaryExecNode with ObjectProducerExec with StateStoreWriter with WatermarkSupport {
- import KeyedStateImpl._
+ import GroupStateImpl._
private val isTimeoutEnabled = timeoutConf != NoTimeout
private val timestampTimeoutAttribute =
@@ -147,7 +147,7 @@ case class FlatMapGroupsWithStateExec(
private val stateSerializer = {
val encoderSerializer = stateEncoder.namedExpressions
if (isTimeoutEnabled) {
- encoderSerializer :+ Literal(KeyedStateImpl.NO_TIMESTAMP)
+ encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
} else {
encoderSerializer
}
@@ -211,7 +211,7 @@ case class FlatMapGroupsWithStateExec(
val keyObj = getKeyObj(keyRow) // convert key to objects
val valueObjIter = valueRowIter.map(getValueObj.apply) // convert value rows to objects
val stateObjOption = getStateObj(prevStateRowOption)
- val keyedState = new KeyedStateImpl(
+ val keyedState = new GroupStateImpl(
stateObjOption,
batchTimestampMs.getOrElse(NO_TIMESTAMP),
eventTimeWatermark.getOrElse(NO_TIMESTAMP),
@@ -247,7 +247,7 @@ case class FlatMapGroupsWithStateExec(
if (shouldWriteState) {
if (stateRowToWrite == null) {
- // This should never happen because checks in KeyedStateImpl should avoid cases
+ // This should never happen because checks in GroupStateImpl should avoid cases
// where empty state would need to be written
throw new IllegalStateException("Attempting to write empty state")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
new file mode 100644
index 0000000..148d922
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.sql.Date
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
+import org.apache.spark.sql.execution.streaming.GroupStateImpl._
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Internal implementation of the [[GroupState]] interface. Methods are not thread-safe.
+ *
+ * @param optionalValue Optional value of the state
+ * @param batchProcessingTimeMs Processing time of current batch, used to calculate timestamp
+ * for processing time timeouts
+ * @param timeoutConf Type of timeout configured. Based on this, different operations will
+ * be supported.
+ * @param hasTimedOut Whether the key for which this state wrapped is being created is
+ * getting timed out or not.
+ */
+private[sql] class GroupStateImpl[S](
+ optionalValue: Option[S],
+ batchProcessingTimeMs: Long,
+ eventTimeWatermarkMs: Long,
+ timeoutConf: GroupStateTimeout,
+ override val hasTimedOut: Boolean) extends GroupState[S] {
+
+ // Constructor to create dummy state when using mapGroupsWithState in a batch query
+ def this(optionalValue: Option[S]) = this(
+ optionalValue,
+ batchProcessingTimeMs = NO_TIMESTAMP,
+ eventTimeWatermarkMs = NO_TIMESTAMP,
+ timeoutConf = GroupStateTimeout.NoTimeout,
+ hasTimedOut = false)
+ private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
+ private var defined: Boolean = optionalValue.isDefined
+ private var updated: Boolean = false // whether value has been updated (but not removed)
+ private var removed: Boolean = false // whether value has been removed
+ private var timeoutTimestamp: Long = NO_TIMESTAMP
+
+ // ========= Public API =========
+ override def exists: Boolean = defined
+
+ override def get: S = {
+ if (defined) {
+ value
+ } else {
+ throw new NoSuchElementException("State is either not defined or has already been removed")
+ }
+ }
+
+ override def getOption: Option[S] = {
+ if (defined) {
+ Some(value)
+ } else {
+ None
+ }
+ }
+
+ override def update(newValue: S): Unit = {
+ if (newValue == null) {
+ throw new IllegalArgumentException("'null' is not a valid state value")
+ }
+ value = newValue
+ defined = true
+ updated = true
+ removed = false
+ }
+
+ override def remove(): Unit = {
+ defined = false
+ updated = false
+ removed = true
+ timeoutTimestamp = NO_TIMESTAMP
+ }
+
+ override def setTimeoutDuration(durationMs: Long): Unit = {
+ if (timeoutConf != ProcessingTimeTimeout) {
+ throw new UnsupportedOperationException(
+ "Cannot set timeout duration without enabling processing time timeout in " +
+ "map/flatMapGroupsWithState")
+ }
+ if (!defined) {
+ throw new IllegalStateException(
+ "Cannot set timeout information without any state value, " +
+ "state has either not been initialized, or has already been removed")
+ }
+
+ if (durationMs <= 0) {
+ throw new IllegalArgumentException("Timeout duration must be positive")
+ }
+ if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
+ timeoutTimestamp = durationMs + batchProcessingTimeMs
+ } else {
+ // This is being called in a batch query, hence no processing timestamp.
+ // Just ignore any attempts to set timeout.
+ }
+ }
+
+ override def setTimeoutDuration(duration: String): Unit = {
+ setTimeoutDuration(parseDuration(duration))
+ }
+
+ @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestampMs: Long): Unit = {
+ checkTimeoutTimestampAllowed()
+ if (timestampMs <= 0) {
+ throw new IllegalArgumentException("Timeout timestamp must be positive")
+ }
+ if (eventTimeWatermarkMs != NO_TIMESTAMP && timestampMs < eventTimeWatermarkMs) {
+ throw new IllegalArgumentException(
+ s"Timeout timestamp ($timestampMs) cannot be earlier than the " +
+ s"current watermark ($eventTimeWatermarkMs)")
+ }
+ if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
+ timeoutTimestamp = timestampMs
+ } else {
+ // This is being called in a batch query, hence no processing timestamp.
+ // Just ignore any attempts to set timeout.
+ }
+ }
+
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
+ checkTimeoutTimestampAllowed()
+ setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
+ }
+
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestamp: Date): Unit = {
+ checkTimeoutTimestampAllowed()
+ setTimeoutTimestamp(timestamp.getTime)
+ }
+
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
+ checkTimeoutTimestampAllowed()
+ setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
+ }
+
+ override def toString: String = {
+ s"GroupState(${getOption.map(_.toString).getOrElse("<undefined>")})"
+ }
+
+ // ========= Internal API =========
+
+ /** Whether the state has been marked for removing */
+ def hasRemoved: Boolean = removed
+
+ /** Whether the state has been updated */
+ def hasUpdated: Boolean = updated
+
+ /** Return timeout timestamp or `TIMEOUT_TIMESTAMP_NOT_SET` if not set */
+ def getTimeoutTimestamp: Long = timeoutTimestamp
+
+ private def parseDuration(duration: String): Long = {
+ if (StringUtils.isBlank(duration)) {
+ throw new IllegalArgumentException(
+ "Provided duration is null or blank.")
+ }
+ val intervalString = if (duration.startsWith("interval")) {
+ duration
+ } else {
+ "interval " + duration
+ }
+ val cal = CalendarInterval.fromString(intervalString)
+ if (cal == null) {
+ throw new IllegalArgumentException(
+ s"Provided duration ($duration) is not valid.")
+ }
+ if (cal.milliseconds < 0 || cal.months < 0) {
+ throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
+ }
+
+ val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+ cal.milliseconds + cal.months * millisPerMonth
+ }
+
+ private def checkTimeoutTimestampAllowed(): Unit = {
+ if (timeoutConf != EventTimeTimeout) {
+ throw new UnsupportedOperationException(
+ "Cannot set timeout timestamp without enabling event time timeout in " +
+ "map/flatMapGroupsWithState")
+ }
+ if (!defined) {
+ throw new IllegalStateException(
+ "Cannot set timeout timestamp without any state value, " +
+ "state has either not been initialized, or has already been removed")
+ }
+ }
+}
+
+
+private[sql] object GroupStateImpl {
+ // Value used represent the lack of valid timestamp as a long
+ val NO_TIMESTAMP = -1L
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
deleted file mode 100644
index edfd35b..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.sql.Date
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, ProcessingTimeTimeout}
-import org.apache.spark.sql.execution.streaming.KeyedStateImpl._
-import org.apache.spark.sql.streaming.{KeyedState, KeyedStateTimeout}
-import org.apache.spark.unsafe.types.CalendarInterval
-
-
-/**
- * Internal implementation of the [[KeyedState]] interface. Methods are not thread-safe.
- * @param optionalValue Optional value of the state
- * @param batchProcessingTimeMs Processing time of current batch, used to calculate timestamp
- * for processing time timeouts
- * @param timeoutConf Type of timeout configured. Based on this, different operations will
- * be supported.
- * @param hasTimedOut Whether the key for which this state wrapped is being created is
- * getting timed out or not.
- */
-private[sql] class KeyedStateImpl[S](
- optionalValue: Option[S],
- batchProcessingTimeMs: Long,
- eventTimeWatermarkMs: Long,
- timeoutConf: KeyedStateTimeout,
- override val hasTimedOut: Boolean) extends KeyedState[S] {
-
- // Constructor to create dummy state when using mapGroupsWithState in a batch query
- def this(optionalValue: Option[S]) = this(
- optionalValue,
- batchProcessingTimeMs = NO_TIMESTAMP,
- eventTimeWatermarkMs = NO_TIMESTAMP,
- timeoutConf = KeyedStateTimeout.NoTimeout,
- hasTimedOut = false)
- private var value: S = optionalValue.getOrElse(null.asInstanceOf[S])
- private var defined: Boolean = optionalValue.isDefined
- private var updated: Boolean = false // whether value has been updated (but not removed)
- private var removed: Boolean = false // whether value has been removed
- private var timeoutTimestamp: Long = NO_TIMESTAMP
-
- // ========= Public API =========
- override def exists: Boolean = defined
-
- override def get: S = {
- if (defined) {
- value
- } else {
- throw new NoSuchElementException("State is either not defined or has already been removed")
- }
- }
-
- override def getOption: Option[S] = {
- if (defined) {
- Some(value)
- } else {
- None
- }
- }
-
- override def update(newValue: S): Unit = {
- if (newValue == null) {
- throw new IllegalArgumentException("'null' is not a valid state value")
- }
- value = newValue
- defined = true
- updated = true
- removed = false
- }
-
- override def remove(): Unit = {
- defined = false
- updated = false
- removed = true
- timeoutTimestamp = NO_TIMESTAMP
- }
-
- override def setTimeoutDuration(durationMs: Long): Unit = {
- if (timeoutConf != ProcessingTimeTimeout) {
- throw new UnsupportedOperationException(
- "Cannot set timeout duration without enabling processing time timeout in " +
- "map/flatMapGroupsWithState")
- }
- if (!defined) {
- throw new IllegalStateException(
- "Cannot set timeout information without any state value, " +
- "state has either not been initialized, or has already been removed")
- }
-
- if (durationMs <= 0) {
- throw new IllegalArgumentException("Timeout duration must be positive")
- }
- if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
- timeoutTimestamp = durationMs + batchProcessingTimeMs
- } else {
- // This is being called in a batch query, hence no processing timestamp.
- // Just ignore any attempts to set timeout.
- }
- }
-
- override def setTimeoutDuration(duration: String): Unit = {
- setTimeoutDuration(parseDuration(duration))
- }
-
- @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- override def setTimeoutTimestamp(timestampMs: Long): Unit = {
- checkTimeoutTimestampAllowed()
- if (timestampMs <= 0) {
- throw new IllegalArgumentException("Timeout timestamp must be positive")
- }
- if (eventTimeWatermarkMs != NO_TIMESTAMP && timestampMs < eventTimeWatermarkMs) {
- throw new IllegalArgumentException(
- s"Timeout timestamp ($timestampMs) cannot be earlier than the " +
- s"current watermark ($eventTimeWatermarkMs)")
- }
- if (!removed && batchProcessingTimeMs != NO_TIMESTAMP) {
- timeoutTimestamp = timestampMs
- } else {
- // This is being called in a batch query, hence no processing timestamp.
- // Just ignore any attempts to set timeout.
- }
- }
-
- @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- override def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit = {
- checkTimeoutTimestampAllowed()
- setTimeoutTimestamp(parseDuration(additionalDuration) + timestampMs)
- }
-
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- override def setTimeoutTimestamp(timestamp: Date): Unit = {
- checkTimeoutTimestampAllowed()
- setTimeoutTimestamp(timestamp.getTime)
- }
-
- @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- override def setTimeoutTimestamp(timestamp: Date, additionalDuration: String): Unit = {
- checkTimeoutTimestampAllowed()
- setTimeoutTimestamp(timestamp.getTime + parseDuration(additionalDuration))
- }
-
- override def toString: String = {
- s"KeyedState(${getOption.map(_.toString).getOrElse("<undefined>")})"
- }
-
- // ========= Internal API =========
-
- /** Whether the state has been marked for removing */
- def hasRemoved: Boolean = removed
-
- /** Whether the state has been updated */
- def hasUpdated: Boolean = updated
-
- /** Return timeout timestamp or `TIMEOUT_TIMESTAMP_NOT_SET` if not set */
- def getTimeoutTimestamp: Long = timeoutTimestamp
-
- private def parseDuration(duration: String): Long = {
- if (StringUtils.isBlank(duration)) {
- throw new IllegalArgumentException(
- "Provided duration is null or blank.")
- }
- val intervalString = if (duration.startsWith("interval")) {
- duration
- } else {
- "interval " + duration
- }
- val cal = CalendarInterval.fromString(intervalString)
- if (cal == null) {
- throw new IllegalArgumentException(
- s"Provided duration ($duration) is not valid.")
- }
- if (cal.milliseconds < 0 || cal.months < 0) {
- throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
- }
-
- val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
- cal.milliseconds + cal.months * millisPerMonth
- }
-
- private def checkTimeoutTimestampAllowed(): Unit = {
- if (timeoutConf != EventTimeTimeout) {
- throw new UnsupportedOperationException(
- "Cannot set timeout timestamp without enabling event time timeout in " +
- "map/flatMapGroupsWithState")
- }
- if (!defined) {
- throw new IllegalStateException(
- "Cannot set timeout timestamp without any state value, " +
- "state has either not been initialized, or has already been removed")
- }
- }
-}
-
-
-private[sql] object KeyedStateImpl {
- // Value used represent the lack of valid timestamp as a long
- val NO_TIMESTAMP = -1L
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index f72144a..8dbda29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, Predicate}
-import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalKeyedState, ProcessingTimeTimeout}
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalGroupState, ProcessingTimeTimeout}
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.{KeyedStateTimeout, OutputMode}
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
import org.apache.spark.sql.types._
import org.apache.spark.util.CompletionIterator
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
new file mode 100644
index 0000000..60a4d0d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.sql.{Encoder, KeyValueGroupedDataset}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
+
+/**
+ * :: Experimental ::
+ *
+ * Wrapper class for interacting with per-group state data in `mapGroupsWithState` and
+ * `flatMapGroupsWithState` operations on [[KeyValueGroupedDataset]].
+ *
+ * Detail description on `[map/flatMap]GroupsWithState` operation
+ * --------------------------------------------------------------
+ * Both, `mapGroupsWithState` and `flatMapGroupsWithState` in [[KeyValueGroupedDataset]]
+ * will invoke the user-given function on each group (defined by the grouping function in
+ * `Dataset.groupByKey()`) while maintaining user-defined per-group state between invocations.
+ * For a static batch Dataset, the function will be invoked once per group. For a streaming
+ * Dataset, the function will be invoked for each group repeatedly in every trigger.
+ * That is, in every batch of the `streaming.StreamingQuery`,
+ * the function will be invoked once for each group that has data in the trigger. Furthermore,
+ * if timeout is set, then the function will invoked on timed out groups (more detail below).
+ *
+ * The function is invoked with following parameters.
+ * - The key of the group.
+ * - An iterator containing all the values for this group.
+ * - A user-defined state object set by previous invocations of the given function.
+ * In case of a batch Dataset, there is only one invocation and state object will be empty as
+ * there is no prior state. Essentially, for batch Datasets, `[map/flatMap]GroupsWithState`
+ * is equivalent to `[map/flatMap]Groups` and any updates to the state and/or timeouts have
+ * no effect.
+ *
+ * Important points to note about the function.
+ * - In a trigger, the function will be called only the groups present in the batch. So do not
+ * assume that the function will be called in every trigger for every group that has state.
+ * - There is no guaranteed ordering of values in the iterator in the function, neither with
+ * batch, nor with streaming Datasets.
+ * - All the data will be shuffled before applying the function.
+ * - If timeout is set, then the function will also be called with no values.
+ * See more details on `GroupStateTimeout` below.
+ *
+ * Important points to note about using `GroupState`.
+ * - The value of the state cannot be null. So updating state with null will throw
+ * `IllegalArgumentException`.
+ * - Operations on `GroupState` are not thread-safe. This is to avoid memory barriers.
+ * - If `remove()` is called, then `exists()` will return `false`,
+ * `get()` will throw `NoSuchElementException` and `getOption()` will return `None`
+ * - After that, if `update(newState)` is called, then `exists()` will again return `true`,
+ * `get()` and `getOption()`will return the updated value.
+ *
+ * Important points to note about using `GroupStateTimeout`.
+ * - The timeout type is a global param across all the groups (set as `timeout` param in
+ * `[map|flatMap]GroupsWithState`, but the exact timeout duration/timestamp is configurable per
+ * group by calling `setTimeout...()` in `GroupState`.
+ * - Timeouts can be either based on processing time (i.e.
+ * [[GroupStateTimeout.ProcessingTimeTimeout]]) or event time (i.e.
+ * [[GroupStateTimeout.EventTimeTimeout]]).
+ * - With `ProcessingTimeTimeout`, the timeout duration can be set by calling
+ * `GroupState.setTimeoutDuration`. The timeout will occur when the clock has advanced by the set
+ * duration. Guarantees provided by this timeout with a duration of D ms are as follows:
+ * - Timeout will never be occur before the clock time has advanced by D ms
+ * - Timeout will occur eventually when there is a trigger in the query
+ * (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
+ * For example, the trigger interval of the query will affect when the timeout actually occurs.
+ * If there is no data in the stream (for any group) for a while, then their will not be
+ * any trigger and timeout function call will not occur until there is data.
+ * - Since the processing time timeout is based on the clock time, it is affected by the
+ * variations in the system clock (i.e. time zone changes, clock skew, etc.).
+ * - With `EventTimeTimeout`, the user also has to specify the the the event time watermark in
+ * the query using `Dataset.withWatermark()`. With this setting, data that is older than the
+ * watermark are filtered out. The timeout can be set for a group by setting a timeout timestamp
+ * using`GroupState.setTimeoutTimestamp()`, and the timeout would occur when the watermark
+ * advances beyond the set timestamp. You can control the timeout delay by two parameters -
+ * (i) watermark delay and an additional duration beyond the timestamp in the event (which
+ * is guaranteed to be newer than watermark due to the filtering). Guarantees provided by this
+ * timeout are as follows:
+ * - Timeout will never be occur before watermark has exceeded the set timeout.
+ * - Similar to processing time timeouts, there is a no strict upper bound on the delay when
+ * the timeout actually occurs. The watermark can advance only when there is data in the
+ * stream, and the event time of the data has actually advanced.
+ * - When the timeout occurs for a group, the function is called for that group with no values, and
+ * `GroupState.hasTimedOut()` set to true.
+ * - The timeout is reset every time the function is called on a group, that is,
+ * when the group has new data, or the group has timed out. So the user has to set the timeout
+ * duration every time the function is called, otherwise there will not be any timeout set.
+ *
+ * Scala example of using GroupState in `mapGroupsWithState`:
+ * {{{
+ * // A mapping function that maintains an integer state for string keys and returns a string.
+ * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
+ * def mappingFunction(key: String, value: Iterator[Int], state: GroupState[Int]): String = {
+ *
+ * if (state.hasTimedOut) { // If called when timing out, remove the state
+ * state.remove()
+ *
+ * } else if (state.exists) { // If state exists, use it for processing
+ * val existingState = state.get // Get the existing state
+ * val shouldRemove = ... // Decide whether to remove the state
+ * if (shouldRemove) {
+ * state.remove() // Remove the state
+ *
+ * } else {
+ * val newState = ...
+ * state.update(newState) // Set the new state
+ * state.setTimeoutDuration("1 hour") // Set the timeout
+ * }
+ *
+ * } else {
+ * val initialState = ...
+ * state.update(initialState) // Set the initial state
+ * state.setTimeoutDuration("1 hour") // Set the timeout
+ * }
+ * ...
+ * // return something
+ * }
+ *
+ * dataset
+ * .groupByKey(...)
+ * .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(mappingFunction)
+ * }}}
+ *
+ * Java example of using `GroupState`:
+ * {{{
+ * // A mapping function that maintains an integer state for string keys and returns a string.
+ * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
+ * MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
+ * new MapGroupsWithStateFunction<String, Integer, Integer, String>() {
+ *
+ * @Override
+ * public String call(String key, Iterator<Integer> value, GroupState<Integer> state) {
+ * if (state.hasTimedOut()) { // If called when timing out, remove the state
+ * state.remove();
+ *
+ * } else if (state.exists()) { // If state exists, use it for processing
+ * int existingState = state.get(); // Get the existing state
+ * boolean shouldRemove = ...; // Decide whether to remove the state
+ * if (shouldRemove) {
+ * state.remove(); // Remove the state
+ *
+ * } else {
+ * int newState = ...;
+ * state.update(newState); // Set the new state
+ * state.setTimeoutDuration("1 hour"); // Set the timeout
+ * }
+ *
+ * } else {
+ * int initialState = ...; // Set the initial state
+ * state.update(initialState);
+ * state.setTimeoutDuration("1 hour"); // Set the timeout
+ * }
+ * ...
+* // return something
+ * }
+ * };
+ *
+ * dataset
+ * .groupByKey(...)
+ * .mapGroupsWithState(
+ * mappingFunction, Encoders.INT, Encoders.STRING, GroupStateTimeout.ProcessingTimeTimeout);
+ * }}}
+ *
+ * @tparam S User-defined type of the state to be stored for each group. Must be encodable into
+ * Spark SQL types (see [[Encoder]] for more details).
+ * @since 2.2.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+trait GroupState[S] extends LogicalGroupState[S] {
+
+ /** Whether state exists or not. */
+ def exists: Boolean
+
+ /** Get the state value if it exists, or throw NoSuchElementException. */
+ @throws[NoSuchElementException]("when state does not exist")
+ def get: S
+
+ /** Get the state value as a scala Option. */
+ def getOption: Option[S]
+
+ /**
+ * Update the value of the state. Note that `null` is not a valid value, and it throws
+ * IllegalArgumentException.
+ */
+ @throws[IllegalArgumentException]("when updating with null")
+ def update(newState: S): Unit
+
+ /** Remove this state. Note that this resets any timeout configuration as well. */
+ def remove(): Unit
+
+ /**
+ * Whether the function has been called because the key has timed out.
+ * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def hasTimedOut: Boolean
+
+ /**
+ * Set the timeout duration in ms for this key.
+ *
+ * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ @throws[IllegalArgumentException]("if 'durationMs' is not positive")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ def setTimeoutDuration(durationMs: Long): Unit
+
+ /**
+ * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
+ *
+ * @note, ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ def setTimeoutDuration(duration: String): Unit
+
+ @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as milliseconds in epoch time.
+ * This timestamp cannot be older than the current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestampMs: Long): Unit
+
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
+ * duration as a string (e.g. "1 hour", "2 days", etc.).
+ * The final timestamp (including the additional duration) cannot be older than the
+ * current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
+
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as a java.sql.Date.
+ * This timestamp cannot be older than the current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
+
+ @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
+ @throws[IllegalStateException]("when state is either not initialized, or already removed")
+ @throws[UnsupportedOperationException](
+ "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
+ /**
+ * Set the timeout timestamp for this key as a java.sql.Date and an additional
+ * duration as a string (e.g. "1 hour", "2 days", etc.).
+ * The final timestamp (including the additional duration) cannot be older than the
+ * current watermark.
+ *
+ * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
+ */
+ def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
deleted file mode 100644
index 461de04..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/KeyedState.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.annotation.{Experimental, InterfaceStability}
-import org.apache.spark.sql.{Encoder, KeyValueGroupedDataset}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalKeyedState
-
-/**
- * :: Experimental ::
- *
- * Wrapper class for interacting with keyed state data in `mapGroupsWithState` and
- * `flatMapGroupsWithState` operations on
- * [[KeyValueGroupedDataset]].
- *
- * Detail description on `[map/flatMap]GroupsWithState` operation
- * --------------------------------------------------------------
- * Both, `mapGroupsWithState` and `flatMapGroupsWithState` in [[KeyValueGroupedDataset]]
- * will invoke the user-given function on each group (defined by the grouping function in
- * `Dataset.groupByKey()`) while maintaining user-defined per-group state between invocations.
- * For a static batch Dataset, the function will be invoked once per group. For a streaming
- * Dataset, the function will be invoked for each group repeatedly in every trigger.
- * That is, in every batch of the `streaming.StreamingQuery`,
- * the function will be invoked once for each group that has data in the trigger. Furthermore,
- * if timeout is set, then the function will invoked on timed out keys (more detail below).
- *
- * The function is invoked with following parameters.
- * - The key of the group.
- * - An iterator containing all the values for this key.
- * - A user-defined state object set by previous invocations of the given function.
- * In case of a batch Dataset, there is only one invocation and state object will be empty as
- * there is no prior state. Essentially, for batch Datasets, `[map/flatMap]GroupsWithState`
- * is equivalent to `[map/flatMap]Groups` and any updates to the state and/or timeouts have
- * no effect.
- *
- * Important points to note about the function.
- * - In a trigger, the function will be called only the groups present in the batch. So do not
- * assume that the function will be called in every trigger for every group that has state.
- * - There is no guaranteed ordering of values in the iterator in the function, neither with
- * batch, nor with streaming Datasets.
- * - All the data will be shuffled before applying the function.
- * - If timeout is set, then the function will also be called with no values.
- * See more details on `KeyedStateTimeout` below.
- *
- * Important points to note about using `KeyedState`.
- * - The value of the state cannot be null. So updating state with null will throw
- * `IllegalArgumentException`.
- * - Operations on `KeyedState` are not thread-safe. This is to avoid memory barriers.
- * - If `remove()` is called, then `exists()` will return `false`,
- * `get()` will throw `NoSuchElementException` and `getOption()` will return `None`
- * - After that, if `update(newState)` is called, then `exists()` will again return `true`,
- * `get()` and `getOption()`will return the updated value.
- *
- * Important points to note about using `KeyedStateTimeout`.
- * - The timeout type is a global param across all the keys (set as `timeout` param in
- * `[map|flatMap]GroupsWithState`, but the exact timeout duration/timestamp is configurable per
- * key by calling `setTimeout...()` in `KeyedState`.
- * - Timeouts can be either based on processing time (i.e.
- * [[KeyedStateTimeout.ProcessingTimeTimeout]]) or event time (i.e.
- * [[KeyedStateTimeout.EventTimeTimeout]]).
- * - With `ProcessingTimeTimeout`, the timeout duration can be set by calling
- * `KeyedState.setTimeoutDuration`. The timeout will occur when the clock has advanced by the set
- * duration. Guarantees provided by this timeout with a duration of D ms are as follows:
- * - Timeout will never be occur before the clock time has advanced by D ms
- * - Timeout will occur eventually when there is a trigger in the query
- * (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
- * For example, the trigger interval of the query will affect when the timeout actually occurs.
- * If there is no data in the stream (for any key) for a while, then their will not be
- * any trigger and timeout function call will not occur until there is data.
- * - Since the processing time timeout is based on the clock time, it is affected by the
- * variations in the system clock (i.e. time zone changes, clock skew, etc.).
- * - With `EventTimeTimeout`, the user also has to specify the the the event time watermark in
- * the query using `Dataset.withWatermark()`. With this setting, data that is older than the
- * watermark are filtered out. The timeout can be enabled for a key by setting a timestamp using
- * `KeyedState.setTimeoutTimestamp()`, and the timeout would occur when the watermark advances
- * beyond the set timestamp. You can control the timeout delay by two parameters - (i) watermark
- * delay and an additional duration beyond the timestamp in the event (which is guaranteed to
- * > watermark due to the filtering). Guarantees provided by this timeout are as follows:
- * - Timeout will never be occur before watermark has exceeded the set timeout.
- * - Similar to processing time timeouts, there is a no strict upper bound on the delay when
- * the timeout actually occurs. The watermark can advance only when there is data in the
- * stream, and the event time of the data has actually advanced.
- * - When the timeout occurs for a key, the function is called for that key with no values, and
- * `KeyedState.hasTimedOut()` set to true.
- * - The timeout is reset for key every time the function is called on the key, that is,
- * when the key has new data, or the key has timed out. So the user has to set the timeout
- * duration every time the function is called, otherwise there will not be any timeout set.
- *
- * Scala example of using KeyedState in `mapGroupsWithState`:
- * {{{
- * // A mapping function that maintains an integer state for string keys and returns a string.
- * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
- * def mappingFunction(key: String, value: Iterator[Int], state: KeyedState[Int]): String = {
- *
- * if (state.hasTimedOut) { // If called when timing out, remove the state
- * state.remove()
- *
- * } else if (state.exists) { // If state exists, use it for processing
- * val existingState = state.get // Get the existing state
- * val shouldRemove = ... // Decide whether to remove the state
- * if (shouldRemove) {
- * state.remove() // Remove the state
- *
- * } else {
- * val newState = ...
- * state.update(newState) // Set the new state
- * state.setTimeoutDuration("1 hour") // Set the timeout
- * }
- *
- * } else {
- * val initialState = ...
- * state.update(initialState) // Set the initial state
- * state.setTimeoutDuration("1 hour") // Set the timeout
- * }
- * ...
- * // return something
- * }
- *
- * dataset
- * .groupByKey(...)
- * .mapGroupsWithState(KeyedStateTimeout.ProcessingTimeTimeout)(mappingFunction)
- * }}}
- *
- * Java example of using `KeyedState`:
- * {{{
- * // A mapping function that maintains an integer state for string keys and returns a string.
- * // Additionally, it sets a timeout to remove the state if it has not received data for an hour.
- * MapGroupsWithStateFunction<String, Integer, Integer, String> mappingFunction =
- * new MapGroupsWithStateFunction<String, Integer, Integer, String>() {
- *
- * @Override
- * public String call(String key, Iterator<Integer> value, KeyedState<Integer> state) {
- * if (state.hasTimedOut()) { // If called when timing out, remove the state
- * state.remove();
- *
- * } else if (state.exists()) { // If state exists, use it for processing
- * int existingState = state.get(); // Get the existing state
- * boolean shouldRemove = ...; // Decide whether to remove the state
- * if (shouldRemove) {
- * state.remove(); // Remove the state
- *
- * } else {
- * int newState = ...;
- * state.update(newState); // Set the new state
- * state.setTimeoutDuration("1 hour"); // Set the timeout
- * }
- *
- * } else {
- * int initialState = ...; // Set the initial state
- * state.update(initialState);
- * state.setTimeoutDuration("1 hour"); // Set the timeout
- * }
- * ...
-* // return something
- * }
- * };
- *
- * dataset
- * .groupByKey(...)
- * .mapGroupsWithState(
- * mappingFunction, Encoders.INT, Encoders.STRING, KeyedStateTimeout.ProcessingTimeTimeout);
- * }}}
- *
- * @tparam S User-defined type of the state to be stored for each key. Must be encodable into
- * Spark SQL types (see [[Encoder]] for more details).
- * @since 2.2.0
- */
-@Experimental
-@InterfaceStability.Evolving
-trait KeyedState[S] extends LogicalKeyedState[S] {
-
- /** Whether state exists or not. */
- def exists: Boolean
-
- /** Get the state value if it exists, or throw NoSuchElementException. */
- @throws[NoSuchElementException]("when state does not exist")
- def get: S
-
- /** Get the state value as a scala Option. */
- def getOption: Option[S]
-
- /**
- * Update the value of the state. Note that `null` is not a valid value, and it throws
- * IllegalArgumentException.
- */
- @throws[IllegalArgumentException]("when updating with null")
- def update(newState: S): Unit
-
- /** Remove this keyed state. Note that this resets any timeout configuration as well. */
- def remove(): Unit
-
- /**
- * Whether the function has been called because the key has timed out.
- * @note This can return true only when timeouts are enabled in `[map/flatmap]GroupsWithStates`.
- */
- def hasTimedOut: Boolean
-
- /**
- * Set the timeout duration in ms for this key.
- *
- * @note ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
- */
- @throws[IllegalArgumentException]("if 'durationMs' is not positive")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- def setTimeoutDuration(durationMs: Long): Unit
-
- /**
- * Set the timeout duration for this key as a string. For example, "1 hour", "2 days", etc.
- *
- * @note, ProcessingTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
- */
- @throws[IllegalArgumentException]("if 'duration' is not a valid duration")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- def setTimeoutDuration(duration: String): Unit
-
- @throws[IllegalArgumentException]("if 'timestampMs' is not positive")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- /**
- * Set the timeout timestamp for this key as milliseconds in epoch time.
- * This timestamp cannot be older than the current watermark.
- *
- * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
- */
- def setTimeoutTimestamp(timestampMs: Long): Unit
-
- @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- /**
- * Set the timeout timestamp for this key as milliseconds in epoch time and an additional
- * duration as a string (e.g. "1 hour", "2 days", etc.).
- * The final timestamp (including the additional duration) cannot be older than the
- * current watermark.
- *
- * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
- */
- def setTimeoutTimestamp(timestampMs: Long, additionalDuration: String): Unit
-
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- /**
- * Set the timeout timestamp for this key as a java.sql.Date.
- * This timestamp cannot be older than the current watermark.
- *
- * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
- */
- def setTimeoutTimestamp(timestamp: java.sql.Date): Unit
-
- @throws[IllegalArgumentException]("if 'additionalDuration' is invalid")
- @throws[IllegalStateException]("when state is either not initialized, or already removed")
- @throws[UnsupportedOperationException](
- "if 'timeout' has not been enabled in [map|flatMap]GroupsWithState in a streaming query")
- /**
- * Set the timeout timestamp for this key as a java.sql.Date and an additional
- * duration as a string (e.g. "1 hour", "2 days", etc.).
- * The final timestamp (including the additional duration) cannot be older than the
- * current watermark.
- *
- * @note, EventTimeTimeout must be enabled in `[map/flatmap]GroupsWithStates`.
- */
- def setTimeoutTimestamp(timestamp: java.sql.Date, additionalDuration: String): Unit
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/82b598b9/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index ffb4c62..78cf033 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -23,7 +23,7 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.util.*;
-import org.apache.spark.sql.streaming.KeyedStateTimeout;
+import org.apache.spark.sql.streaming.GroupStateTimeout;
import org.apache.spark.sql.streaming.OutputMode;
import scala.Tuple2;
import scala.Tuple3;
@@ -210,7 +210,7 @@ public class JavaDatasetSuite implements Serializable {
OutputMode.Append(),
Encoders.LONG(),
Encoders.STRING(),
- KeyedStateTimeout.NoTimeout());
+ GroupStateTimeout.NoTimeout());
Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped2.collectAsList()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org