You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/03/22 15:22:48 UTC

[1/7] flink git commit: [hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state'

Repository: flink
Updated Branches:
  refs/heads/release-1.5 555e80a08 -> 69e5d1462


[hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9aec0ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9aec0ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9aec0ee

Branch: refs/heads/release-1.5
Commit: b9aec0eeebc3a8e0d46a15bae8c09266703e8945
Parents: 555e80a
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 15:15:08 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:12 2018 +0100

----------------------------------------------------------------------
 .../api/common/state/AggregatingState.java      | 10 +++---
 .../state/AggregatingStateDescriptor.java       |  6 ++--
 .../flink/api/common/state/AppendingState.java  | 26 +++++++--------
 .../flink/api/common/state/BroadcastState.java  |  4 +--
 .../flink/api/common/state/FoldingState.java    |  4 +--
 .../common/state/FoldingStateDescriptor.java    |  2 +-
 .../flink/api/common/state/KeyedStateStore.java |  2 +-
 .../flink/api/common/state/ListState.java       |  9 +++---
 .../api/common/state/ListStateDescriptor.java   |  8 ++---
 .../apache/flink/api/common/state/MapState.java | 16 ++++-----
 .../api/common/state/MapStateDescriptor.java    | 12 +++----
 .../flink/api/common/state/MergingState.java    |  2 +-
 .../api/common/state/OperatorStateStore.java    | 11 ++-----
 .../flink/api/common/state/ReducingState.java   |  4 +--
 .../common/state/ReducingStateDescriptor.java   |  8 ++---
 .../flink/api/common/state/StateBinder.java     |  4 +--
 .../flink/api/common/state/StateDescriptor.java | 12 +++----
 .../flink/api/common/state/ValueState.java      | 14 ++++----
 .../api/common/state/ValueStateDescriptor.java  |  9 +++---
 .../state/AggregatingStateDescriptorTest.java   |  8 +++--
 .../common/state/ListStateDescriptorTest.java   | 23 +++++++------
 .../common/state/MapStateDescriptorTest.java    | 19 ++++++-----
 .../state/ReducingStateDescriptorTest.java      | 34 +++++++++++---------
 .../common/state/ValueStateDescriptorTest.java  | 33 ++++++++++---------
 tools/maven/suppressions-core.xml               |  4 ---
 25 files changed, 144 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
index e69fdb4..5c72650 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingState.java
@@ -22,22 +22,22 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AggregateFunction;
 
 /**
- * {@link State} interface for aggregating state, based on an 
+ * {@link State} interface for aggregating state, based on an
  * {@link AggregateFunction}. Elements that are added to this type of state will
  * be eagerly pre-aggregated using a given {@code AggregateFunction}.
- * 
+ *
  * <p>The state holds internally always the accumulator type of the {@code AggregateFunction}.
- * When accessing the result of the state, the function's 
+ * When accessing the result of the state, the function's
  * {@link AggregateFunction#getResult(Object)} method.
  *
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
- * 
+ *
  * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.
- * 
+ *
  * @param <IN> Type of the value added to the state.
  * @param <OUT> Type of the value extracted from the state.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index b7378d6..6f6d2f9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -30,7 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>The type internally stored in the state is the type of the {@code Accumulator} of the
  * {@code AggregateFunction}.
- * 
+ *
  * @param <IN> The type of the values that are added to the state.
  * @param <ACC> The type of the accumulator (intermediate aggregation state).
  * @param <OUT> The type of the values that are returned from the state.
@@ -39,7 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {
 	private static final long serialVersionUID = 1L;
 
-	/** The aggregation function for the state */
+	/** The aggregation function for the state. */
 	private final AggregateFunction<IN, ACC, OUT> aggFunction;
 
 	/**
@@ -49,7 +49,7 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
 	 * consider using the {@link #AggregatingStateDescriptor(String, AggregateFunction, TypeInformation)} constructor.
 	 *
 	 * @param name The (unique) name for the state.
-	 * @param aggFunction The {@code AggregateFunction} used to aggregate the state.   
+	 * @param aggFunction The {@code AggregateFunction} used to aggregate the state.
 	 * @param stateType The type of the accumulator. The accumulator is stored in the state.
 	 */
 	public AggregatingStateDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
index 9723654..4624438 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -20,20 +20,18 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import java.io.IOException;
-
 /**
  * Base interface for partitioned state that supports adding elements and inspecting the current
  * state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
  *
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
- * 
+ *
  * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.
- * 
+ *
  * @param <IN> Type of the value that can be added to the state.
  * @param <OUT> Type of the value that can be retrieved from the state.
  */
@@ -47,29 +45,27 @@ public interface AppendingState<IN, OUT> extends State {
 	 * depends on the current operator input, as the operator maintains an
 	 * independent state for each partition.
 	 *
-	 * <p>
-	 *     <b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
-	 *     should return {@code null}.
-	 * </p>
+	 * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
+	 * should return {@code null}.
 	 *
 	 * @return The operator state value corresponding to the current input or {@code null}
 	 * if the state is empty.
-	 * 
+	 *
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	OUT get() throws Exception ;
+	OUT get() throws Exception;
 
 	/**
 	 * Updates the operator state accessible by {@link #get()} by adding the given value
 	 * to the list of values. The next time {@link #get()} is called (for the same state
 	 * partition) the returned state will represent the updated list.
 	 *
-	 * If `null` is passed in, the state value will remain unchanged
-	 * 
+	 * <p>If null is passed in, the state value will remain unchanged.
+	 *
 	 * @param value The new value for the state.
-	 *            
-	 * @throws IOException Thrown if the system cannot access the state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	void add(IN value) throws Exception;
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
index 0cece41..fcc8bbf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java
@@ -29,7 +29,7 @@ import java.util.Map;
  *
  * <p><b>CAUTION:</b> the user has to guarantee that all task instances store the same elements in this type of state.
  *
- * <p> Each operator instance individually maintains and stores elements in the broadcast state. The fact that the
+ * <p>Each operator instance individually maintains and stores elements in the broadcast state. The fact that the
  * incoming stream is a broadcast one guarantees that all instances see all the elements. Upon recovery
  * or re-scaling, the same state is given to each of the instances. To avoid hotspots, each task reads its previous
  * partition, and if there are more tasks (scale up), then the new instances read from the old instances in a round
@@ -80,7 +80,7 @@ public interface BroadcastState<K, V> extends ReadOnlyBroadcastState<K, V> {
 	Iterator<Map.Entry<K, V>> iterator() throws Exception;
 
 	/**
-	 * Returns all the mappings in the state
+	 * Returns all the mappings in the state.
 	 *
 	 * @return An iterable view of all the key-value pairs in the state.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
index df9a0c6..928e62b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -27,12 +27,12 @@ import org.apache.flink.annotation.PublicEvolving;
  *
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
- * 
+ *
  * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.
- * 
+ *
  * @param <T> Type of the values folded into the state
  * @param <ACC> Type of the value in the state
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 0954047..261d1fe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -98,7 +98,7 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public FoldingState<T, ACC> bind(StateBinder stateBinder) throws Exception {
 		return stateBinder.createFoldingState(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
index a1038a8..e3726b6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyedStateStore.java
@@ -281,5 +281,5 @@ public interface KeyedStateStore {
 	 *                                       function (function is not part of a KeyedStream).
 	 */
 	@PublicEvolving
-	<UK, UV> MapState<UK,UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
+	<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
index 74f275b..254dc1d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
@@ -26,22 +26,23 @@ import java.util.List;
  * {@link State} interface for partitioned list state in Operations.
  * The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
- * 
+ *
  * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.
- * 
+ *
  * @param <T> Type of values that this list state keeps.
  */
 @PublicEvolving
 public interface ListState<T> extends MergingState<T, Iterable<T>> {
+
 	/**
 	 * Updates the operator state accessible by {@link #get()} by updating existing values to
 	 * to the given list of values. The next time {@link #get()} is called (for the same state
 	 * partition) the returned state will represent the updated list.
 	 *
-	 * If `null` or an empty list is passed in, the state value will be null
+	 * <p>If null or an empty list is passed in, the state value will be null.
 	 *
 	 * @param values The new values for the state.
 	 *
@@ -54,7 +55,7 @@ public interface ListState<T> extends MergingState<T, Iterable<T>> {
 	 * to existing list of values. The next time {@link #get()} is called (for the same state
 	 * partition) the returned state will represent the updated list.
 	 *
-	 * If `null` or an empty list is passed in, the state value remains unchanged
+	 * <p>If null or an empty list is passed in, the state value remains unchanged.
 	 *
 	 * @param values The new values to be added to the state.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index e59d6ee..38e5680 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -29,12 +29,12 @@ import java.util.List;
 /**
  * A {@link StateDescriptor} for {@link ListState}. This can be used to create state where the type
  * is a list that can be appended and iterated over.
- * 
+ *
  * <p>Using {@code ListState} is typically more efficient than manually maintaining a list in a
  * {@link ValueState}, because the backing implementation can support efficient appends, rather than
  * replacing the full list on write.
- * 
- * <p>To create keyed list state (on a KeyedStream), use 
+ *
+ * <p>To create keyed list state (on a KeyedStream), use
  * {@link org.apache.flink.api.common.functions.RuntimeContext#getListState(ListStateDescriptor)}.
  *
  * @param <T> The type of the values that can be added to the list state.
@@ -85,7 +85,7 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 
 	/**
 	 * Gets the serializer for the elements contained in the list.
-	 * 
+	 *
 	 * @return The serializer for the elements in the list.
 	 */
 	public TypeSerializer<T> getElementSerializer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index f37fddd..7a130d4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -60,7 +60,7 @@ public interface MapState<UK, UV> extends State {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	void put(UK key, UV value) throws Exception;
-	
+
 	/**
 	 * Copies all of the mappings from the given map into the state.
 	 *
@@ -90,19 +90,19 @@ public interface MapState<UK, UV> extends State {
 	boolean contains(UK key) throws Exception;
 
 	/**
-	 * Returns all the mappings in the state
+	 * Returns all the mappings in the state.
 	 *
 	 * @return An iterable view of all the key-value pairs in the state.
-	 * 
+	 *
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	Iterable<Map.Entry<UK, UV>> entries() throws Exception;
-	
+
 	/**
-	 * Returns all the keys in the state
+	 * Returns all the keys in the state.
 	 *
 	 * @return An iterable view of all the keys in the state.
-	 * 
+	 *
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	Iterable<UK> keys() throws Exception;
@@ -111,7 +111,7 @@ public interface MapState<UK, UV> extends State {
 	 * Returns all the values in the state.
 	 *
 	 * @return An iterable view of all the values in the state.
-	 * 
+	 *
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	Iterable<UV> values() throws Exception;
@@ -120,7 +120,7 @@ public interface MapState<UK, UV> extends State {
 	 * Iterates over all the mappings in the state.
 	 *
 	 * @return An iterator over all the mappings in the state
-	 * 
+	 *
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	Iterator<Map.Entry<UK, UV>> iterator() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 16c00cb..2e7ac98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -29,12 +29,12 @@ import java.util.Map;
 /**
  * A {@link StateDescriptor} for {@link MapState}. This can be used to create state where the type
  * is a map that can be updated and iterated over.
- * 
+ *
  * <p>Using {@code MapState} is typically more efficient than manually maintaining a map in a
  * {@link ValueState}, because the backing implementation can support efficient updates, rather then
  * replacing the full map on write.
- * 
- * <p>To create keyed map state (on a KeyedStream), use 
+ *
+ * <p>To create keyed map state (on a KeyedStream), use
  * {@link org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
  *
  * @param <UK> The type of the keys that can be added to the map state.
@@ -90,7 +90,7 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
 
 	/**
 	 * Gets the serializer for the keys in the state.
-	 * 
+	 *
 	 * @return The serializer for the keys in the state.
 	 */
 	public TypeSerializer<UK> getKeySerializer() {
@@ -115,7 +115,7 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
 
 		return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
 	}
-	
+
 	@Override
 	public int hashCode() {
 		int result = serializer.hashCode();
@@ -128,7 +128,7 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
 		if (this == o) {
 			return true;
 		}
-		
+
 		if (o == null || getClass() != o.getClass()) {
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
index e79f907..8c16313 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
@@ -24,7 +24,7 @@ import org.apache.flink.annotation.PublicEvolving;
  * Extension of {@link AppendingState} that allows merging of state. That is, two instances
  * of {@link MergingState} can be combined into a single instance that contains all the
  * information of the two merged states.
- * 
+ *
  * @param <IN> Type of the value that can be added to the state.
  * @param <OUT> Type of the value that can be retrieved from the state.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
index c2037e0..7a998e6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorStateStore.java
@@ -49,8 +49,7 @@ public interface OperatorStateStore {
 	 * @param <K> The type of the keys in the broadcast state.
 	 * @param <V> The type of the values in the broadcast state.
 	 *
-	 * @return The {@link BroadcastState Broadcast State}.
-	 * @throws Exception
+	 * @return The Broadcast State
 	 */
 	<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;
 
@@ -73,7 +72,6 @@ public interface OperatorStateStore {
 	 * @param <S> The generic type of the state
 	 *
 	 * @return A list for all state partitions.
-	 * @throws Exception
 	 */
 	<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
 
@@ -97,7 +95,6 @@ public interface OperatorStateStore {
 	 * @param <S> The generic type of the state
 	 *
 	 * @return A list for all state partitions.
-	 * @throws Exception
 	 */
 	<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
 
@@ -123,13 +120,12 @@ public interface OperatorStateStore {
 	 * Creates (or restores) a list state. Each state is registered under a unique name.
 	 * The provided serializer is used to de/serialize the state in case of checkpointing (snapshot/restore).
 	 *
-	 * The items in the list are repartitionable by the system in case of changed operator parallelism.
+	 * <p>The items in the list are repartitionable by the system in case of changed operator parallelism.
 	 *
 	 * @param stateDescriptor The descriptor for this state, providing a name and serializer.
 	 * @param <S> The generic type of the state
 	 *
 	 * @return A list for all state partitions.
-	 * @throws Exception
 	 *
 	 * @deprecated since 1.3.0. This was deprecated as part of a refinement to the function names.
 	 *             Please use {@link #getListState(ListStateDescriptor)} instead.
@@ -140,13 +136,12 @@ public interface OperatorStateStore {
 	/**
 	 * Creates a state of the given name that uses Java serialization to persist the state. The items in the list
 	 * are repartitionable by the system in case of changed operator parallelism.
-	 * 
+	 *
 	 * <p>This is a simple convenience method. For more flexibility on how state serialization
 	 * should happen, use the {@link #getListState(ListStateDescriptor)} method.
 	 *
 	 * @param stateName The name of state to create
 	 * @return A list state using Java serialization to serialize state objects.
-	 * @throws Exception
 	 *
 	 * @deprecated since 1.3.0. Using Java serialization for persisting state is not encouraged.
 	 *             Please use {@link #getListState(ListStateDescriptor)} instead.

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
index 25777eb..0fe3ed9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingState.java
@@ -26,12 +26,12 @@ import org.apache.flink.annotation.PublicEvolving;
  *
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
- * 
+ *
  * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.
- * 
+ *
  * @param <T> Type of the value in the operator state
  */
 @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 3edf1ca..a14b4bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -35,9 +35,9 @@ import static java.util.Objects.requireNonNull;
  */
 @PublicEvolving
 public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {
+
 	private static final long serialVersionUID = 1L;
-	
-	
+
 	private final ReduceFunction<T> reduceFunction;
 
 	/**
@@ -47,7 +47,7 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	 * consider using the {@link #ReducingStateDescriptor(String, ReduceFunction, TypeInformation)} constructor.
 	 *
 	 * @param name The (unique) name for the state.
-	 * @param reduceFunction The {@code ReduceFunction} used to aggregate the state.   
+	 * @param reduceFunction The {@code ReduceFunction} used to aggregate the state.
 	 * @param typeClass The type of the values in the state.
 	 */
 	public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
@@ -84,7 +84,7 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public ReducingState<T> bind(StateBinder stateBinder) throws Exception {
 		return stateBinder.createReducingState(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
index a1f7d8d..871b4a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateBinder.java
@@ -56,8 +56,8 @@ public interface StateBinder {
 	 * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
 	 *
 	 * @param <IN> The type of the values that go into the aggregating state
-	 * @param <ACC> The type of the values that are stored in the aggregating state   
-	 * @param <OUT> The type of the values that come out of the aggregating state   
+	 * @param <ACC> The type of the values that are stored in the aggregating state
+	 * @param <OUT> The type of the values that come out of the aggregating state
 	 */
 	<IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(
 			AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index b603c71..841f710 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -82,7 +82,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	/** Name for queries against state created from this StateDescriptor. */
 	private String queryableStateName;
 
-	/** The default value returned by the state when no other value is bound to a key */
+	/** The default value returned by the state when no other value is bound to a key. */
 	protected transient T defaultValue;
 
 	/** The type information describing the value type. Only used to lazily create the serializer
@@ -111,7 +111,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 * @param name The name of the {@code StateDescriptor}.
 	 * @param typeInfo The type information for the values in the state.
 	 * @param defaultValue The default value that will be set when requesting state without setting
-	 *                     a value before.   
+	 *                     a value before.
 	 */
 	protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
 		this.name = requireNonNull(name, "name must not be null");
@@ -301,8 +301,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 
 			byte[] serializedDefaultValue;
 			try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
-					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos))
-			{
+					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) {
+
 				TypeSerializer<T> duplicateSerializer = serializer.duplicate();
 				duplicateSerializer.serialize(defaultValue, outView);
 
@@ -333,8 +333,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 			in.readFully(buffer);
 
 			try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
-					DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais))
-			{
+					DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {
+
 				defaultValue = serializer.deserialize(inView);
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
index ac55715..777e84a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java
@@ -28,12 +28,12 @@ import java.io.IOException;
  *
  * <p>The state is accessed and modified by user functions, and checkpointed consistently
  * by the system as part of the distributed snapshots.
- * 
+ *
  * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
  * automatically supplied by the system, so the function always sees the value mapped to the
  * key of the current element. That way, the system can handle stream and state partitioning
  * consistently together.
- * 
+ *
  * @param <T> Type of the value in the state.
  */
 @PublicEvolving
@@ -50,7 +50,7 @@ public interface ValueState<T> extends State {
 	 * this will return {@code null} when to value was previously set using {@link #update(Object)}.
 	 *
 	 * @return The state value corresponding to the current input.
-	 * 
+	 *
 	 * @throws IOException Thrown if the system cannot access the state.
 	 */
 	T value() throws IOException;
@@ -59,13 +59,13 @@ public interface ValueState<T> extends State {
 	 * Updates the operator state accessible by {@link #value()} to the given
 	 * value. The next time {@link #value()} is called (for the same state
 	 * partition) the returned state will represent the updated value. When a
-	 * partitioned state is updated with null, the state for the current key 
+	 * partitioned state is updated with null, the state for the current key
 	 * will be removed and the default value is returned on the next access.
-	 * 
+	 *
 	 * @param value The new value for the state.
-	 *            
+	 *
 	 * @throws IOException Thrown if the system cannot access the state.
 	 */
 	void update(T value) throws IOException;
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 3afc8a7..ef18d74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -34,11 +34,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
  */
 @PublicEvolving
 public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
+
 	private static final long serialVersionUID = 1L;
-	
+
 	/**
 	 * Creates a new {@code ValueStateDescriptor} with the given name, type, and default value.
-	 * 
+	 *
 	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
 	 * consider using the {@link #ValueStateDescriptor(String, TypeInformation, Object)} constructor.
 	 *
@@ -46,7 +47,7 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 	 * the default value by checking whether the contents of the state is {@code null}.
 	 *
 	 * @param name The (unique) name for the state.
-	 * @param typeClass The type of the values in the state.   
+	 * @param typeClass The type of the values in the state.
 	 * @param defaultValue The default value that will be set when requesting state without setting
 	 *                     a value before.
 	 */
@@ -122,7 +123,7 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public ValueState<T> bind(StateBinder stateBinder) throws Exception {
 		return stateBinder.createValueState(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
index 1b27ebd..155f23a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -29,12 +30,15 @@ import static org.junit.Assert.assertNotSame;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link AggregatingStateDescriptor}.
+ */
 public class AggregatingStateDescriptorTest extends TestLogger {
 
 	/**
-	 * FLINK-6775
+	 * FLINK-6775.
 	 *
-	 * Tests that the returned serializer is duplicated. This allows to
+	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index 0b230ad..c6d086e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -41,16 +41,19 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link ListStateDescriptor}.
+ */
 public class ListStateDescriptorTest {
-	
+
 	@Test
 	public void testValueStateDescriptorEagerSerializer() throws Exception {
 
 		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
-		
-		ListStateDescriptor<String> descr = 
-				new ListStateDescriptor<String>("testName", serializer);
-		
+
+		ListStateDescriptor<String> descr =
+				new ListStateDescriptor<>("testName", serializer);
+
 		assertEquals("testName", descr.getName());
 		assertNotNull(descr.getSerializer());
 		assertTrue(descr.getSerializer() instanceof ListSerializer);
@@ -74,8 +77,8 @@ public class ListStateDescriptorTest {
 		cfg.registerKryoType(TaskInfo.class);
 
 		ListStateDescriptor<Path> descr =
-				new ListStateDescriptor<Path>("testName", Path.class);
-		
+				new ListStateDescriptor<>("testName", Path.class);
+
 		try {
 			descr.getSerializer();
 			fail("should cause an exception");
@@ -96,7 +99,7 @@ public class ListStateDescriptorTest {
 	public void testValueStateDescriptorAutoSerializer() throws Exception {
 
 		ListStateDescriptor<String> descr =
-				new ListStateDescriptor<String>("testName", String.class);
+				new ListStateDescriptor<>("testName", String.class);
 
 		ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
 
@@ -110,9 +113,9 @@ public class ListStateDescriptorTest {
 	}
 
 	/**
-	 * FLINK-6775
+	 * FLINK-6775.
 	 *
-	 * Tests that the returned serializer is duplicated. This allows to
+	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index d710911..e2aa940 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -42,17 +42,20 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link MapStateDescriptor}.
+ */
 public class MapStateDescriptorTest {
-	
+
 	@Test
 	public void testMapStateDescriptorEagerSerializer() throws Exception {
 
 		TypeSerializer<Integer> keySerializer = new KryoSerializer<>(Integer.class, new ExecutionConfig());
 		TypeSerializer<String> valueSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
-		
-		MapStateDescriptor<Integer, String> descr = 
+
+		MapStateDescriptor<Integer, String> descr =
 				new MapStateDescriptor<>("testName", keySerializer, valueSerializer);
-		
+
 		assertEquals("testName", descr.getName());
 		assertNotNull(descr.getSerializer());
 		assertTrue(descr.getSerializer() instanceof MapSerializer);
@@ -81,7 +84,7 @@ public class MapStateDescriptorTest {
 
 		MapStateDescriptor<Path, String> descr =
 				new MapStateDescriptor<>("testName", Path.class, String.class);
-		
+
 		try {
 			descr.getSerializer();
 			fail("should cause an exception");
@@ -96,7 +99,7 @@ public class MapStateDescriptorTest {
 		assertTrue(descr.getKeySerializer() instanceof KryoSerializer);
 
 		assertTrue(((KryoSerializer<?>) descr.getKeySerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
-		
+
 		assertNotNull(descr.getValueSerializer());
 		assertTrue(descr.getValueSerializer() instanceof StringSerializer);
 	}
@@ -121,9 +124,9 @@ public class MapStateDescriptorTest {
 	}
 
 	/**
-	 * FLINK-6775
+	 * FLINK-6775.
 	 *
-	 * Tests that the returned serializer is duplicated. This allows to
+	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index aec7140..ef39f14 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -36,24 +36,26 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
-
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link ReducingStateDescriptor}.
+ */
 public class ReducingStateDescriptorTest extends TestLogger {
-	
+
 	@Test
 	public void testValueStateDescriptorEagerSerializer() throws Exception {
 
 		@SuppressWarnings("unchecked")
-		ReduceFunction<String> reducer = mock(ReduceFunction.class); 
-		
+		ReduceFunction<String> reducer = mock(ReduceFunction.class);
+
 		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
-		
-		ReducingStateDescriptor<String> descr = 
-				new ReducingStateDescriptor<String>("testName", reducer, serializer);
-		
+
+		ReducingStateDescriptor<String> descr =
+				new ReducingStateDescriptor<>("testName", reducer, serializer);
+
 		assertEquals("testName", descr.getName());
 		assertNotNull(descr.getSerializer());
 		assertEquals(serializer, descr.getSerializer());
@@ -70,13 +72,13 @@ public class ReducingStateDescriptorTest extends TestLogger {
 
 		@SuppressWarnings("unchecked")
 		ReduceFunction<Path> reducer = mock(ReduceFunction.class);
-		
+
 		// some different registered value
 		ExecutionConfig cfg = new ExecutionConfig();
 		cfg.registerKryoType(TaskInfo.class);
 
 		ReducingStateDescriptor<Path> descr =
-				new ReducingStateDescriptor<Path>("testName", reducer, Path.class);
+				new ReducingStateDescriptor<>("testName", reducer, Path.class);
 
 		try {
 			descr.getSerializer();
@@ -84,7 +86,7 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		} catch (IllegalStateException ignored) {}
 
 		descr.initializeSerializerUnlessSet(cfg);
-		
+
 		assertNotNull(descr.getSerializer());
 		assertTrue(descr.getSerializer() instanceof KryoSerializer);
 
@@ -98,7 +100,7 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		ReduceFunction<String> reducer = mock(ReduceFunction.class);
 
 		ReducingStateDescriptor<String> descr =
-				new ReducingStateDescriptor<String>("testName", reducer, String.class);
+				new ReducingStateDescriptor<>("testName", reducer, String.class);
 
 		ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
 
@@ -108,9 +110,9 @@ public class ReducingStateDescriptorTest extends TestLogger {
 	}
 
 	/**
-	 * FLINK-6775
+	 * FLINK-6775.
 	 *
-	 * Tests that the returned serializer is duplicated. This allows to
+	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
 	@SuppressWarnings("unchecked")
@@ -134,5 +136,5 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		// check that the retrieved serializers are not the same
 		assertNotSame(serializerA, serializerB);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index e434e01..b43e5ad 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
-
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -42,17 +42,20 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link ValueStateDescriptor}.
+ */
 public class ValueStateDescriptorTest extends TestLogger {
-	
+
 	@Test
 	public void testValueStateDescriptorEagerSerializer() throws Exception {
 
 		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
 		String defaultValue = "le-value-default";
-		
-		ValueStateDescriptor<String> descr = 
-				new ValueStateDescriptor<String>("testName", serializer, defaultValue);
-		
+
+		ValueStateDescriptor<String> descr =
+				new ValueStateDescriptor<>("testName", serializer, defaultValue);
+
 		assertEquals("testName", descr.getName());
 		assertEquals(defaultValue, descr.getDefaultValue());
 		assertNotNull(descr.getSerializer());
@@ -68,16 +71,16 @@ public class ValueStateDescriptorTest extends TestLogger {
 
 	@Test
 	public void testValueStateDescriptorLazySerializer() throws Exception {
-		
+
 		// some default value that goes to the generic serializer
 		Path defaultValue = new Path(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI());
-		
+
 		// some different registered value
 		ExecutionConfig cfg = new ExecutionConfig();
 		cfg.registerKryoType(TaskInfo.class);
 
 		ValueStateDescriptor<Path> descr =
-				new ValueStateDescriptor<Path>("testName", Path.class, defaultValue);
+				new ValueStateDescriptor<>("testName", Path.class, defaultValue);
 
 		try {
 			descr.getSerializer();
@@ -85,7 +88,7 @@ public class ValueStateDescriptorTest extends TestLogger {
 		} catch (IllegalStateException ignored) {}
 
 		descr.initializeSerializerUnlessSet(cfg);
-		
+
 		assertNotNull(descr.getSerializer());
 		assertTrue(descr.getSerializer() instanceof KryoSerializer);
 
@@ -94,11 +97,11 @@ public class ValueStateDescriptorTest extends TestLogger {
 
 	@Test
 	public void testValueStateDescriptorAutoSerializer() throws Exception {
-		
+
 		String defaultValue = "le-value-default";
 
 		ValueStateDescriptor<String> descr =
-				new ValueStateDescriptor<String>("testName", String.class, defaultValue);
+				new ValueStateDescriptor<>("testName", String.class, defaultValue);
 
 		ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
 
@@ -122,7 +125,7 @@ public class ValueStateDescriptorTest extends TestLogger {
 		String defaultValue = new String(data, ConfigConstants.DEFAULT_CHARSET);
 
 		ValueStateDescriptor<String> descr =
-				new ValueStateDescriptor<String>("testName", serializer, defaultValue);
+				new ValueStateDescriptor<>("testName", serializer, defaultValue);
 
 		assertEquals("testName", descr.getName());
 		assertEquals(defaultValue, descr.getDefaultValue());
@@ -138,9 +141,9 @@ public class ValueStateDescriptorTest extends TestLogger {
 	}
 
 	/**
-	 * FLINK-6775
+	 * FLINK-6775.
 	 *
-	 * Tests that the returned serializer is duplicated. This allows to
+	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/b9aec0ee/tools/maven/suppressions-core.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-core.xml b/tools/maven/suppressions-core.xml
index e613fb0..ff9c203 100644
--- a/tools/maven/suppressions-core.xml
+++ b/tools/maven/suppressions-core.xml
@@ -72,10 +72,6 @@ under the License.
 		checks="AvoidStarImport"/>
 
 	<suppress
-		files="(.*)api[/\\]common[/\\]state[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
-	<suppress
 		files="(.*)api[/\\]common[/\\]typeutils[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->


[6/7] flink git commit: [hotfix] [core] Consolidate serializer duplication tests in StateDescriptorTest where possible

Posted by se...@apache.org.
[hotfix] [core] Consolidate serializer duplication tests in StateDescriptorTest where possible


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/effe7d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/effe7d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/effe7d7b

Branch: refs/heads/release-1.5
Commit: effe7d7becf054c352b5a4f5e9330efc1af2c0da
Parents: 586eb10
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 16:46:13 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:47 2018 +0100

----------------------------------------------------------------------
 .../state/AggregatingStateDescriptorTest.java   | 62 --------------------
 .../common/state/ListStateDescriptorTest.java   |  1 -
 .../state/ReducingStateDescriptorTest.java      | 27 ---------
 .../api/common/state/StateDescriptorTest.java   | 30 ++++++++++
 .../common/state/ValueStateDescriptorTest.java  | 23 --------
 5 files changed, 30 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/effe7d7b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
deleted file mode 100644
index f62acc8..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertNotSame;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the {@link AggregatingStateDescriptor}.
- */
-public class AggregatingStateDescriptorTest extends TestLogger {
-
-	/**
-	 * FLINK-6775.
-	 *
-	 * <p>Tests that the returned serializer is duplicated. This allows to
-	 * share the state descriptor.
-	 */
-	@Test
-	public void testSerializerDuplication() {
-		// we need a serializer that actually duplicates for testing (a stateful one)
-		// we use Kryo here, because it meets these conditions
-		TypeSerializer<Long> serializer = new KryoSerializer<>(Long.class, new ExecutionConfig());
-
-		AggregateFunction<Long, Long, Long> aggregatingFunction = mock(AggregateFunction.class);
-
-		AggregatingStateDescriptor<Long, Long, Long> descr = new AggregatingStateDescriptor<>(
-			"foobar",
-			aggregatingFunction,
-			serializer);
-
-		TypeSerializer<Long> serializerA = descr.getSerializer();
-		TypeSerializer<Long> serializerB = descr.getSerializer();
-
-		// check that the retrieved serializers are not the same
-		assertNotSame(serializerA, serializerB);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/effe7d7b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index e7e33e7..b934ee0 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -68,7 +68,6 @@ public class ListStateDescriptorTest {
 	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
-	@SuppressWarnings("unchecked")
 	@Test
 	public void testSerializerDuplication() {
 		// we need a serializer that actually duplicates for testing (a stateful one)

http://git-wip-us.apache.org/repos/asf/flink/blob/effe7d7b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 81b7c38..5d9eba5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
 
 /**
  * Tests for the {@link ReducingStateDescriptor}.
@@ -57,30 +56,4 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		assertNotNull(copy.getSerializer());
 		assertEquals(serializer, copy.getSerializer());
 	}
-
-	/**
-	 * FLINK-6775.
-	 *
-	 * <p>Tests that the returned serializer is duplicated. This allows to
-	 * share the state descriptor.
-	 */
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testSerializerDuplication() {
-		// we need a serializer that actually duplicates for testing (a stateful one)
-		// we use Kryo here, because it meets these conditions
-		TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
-
-		ReducingStateDescriptor<String> descr = new ReducingStateDescriptor<>(
-				"foobar",
-				(a, b) -> a,
-				statefulSerializer);
-
-		TypeSerializer<String> serializerA = descr.getSerializer();
-		TypeSerializer<String> serializerB = descr.getSerializer();
-
-		// check that the retrieved serializers are not the same
-		assertNotSame(serializerA, serializerB);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/effe7d7b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index 59293f4..cf5327e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -33,6 +33,7 @@ import java.io.File;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -41,6 +42,10 @@ import static org.junit.Assert.fail;
  */
 public class StateDescriptorTest {
 
+	// ------------------------------------------------------------------------
+	//  Tests for serializer initialization
+	// ------------------------------------------------------------------------
+
 	@Test
 	public void testInitializeWithSerializer() throws Exception {
 		final TypeSerializer<String> serializer = StringSerializer.INSTANCE;
@@ -131,6 +136,31 @@ public class StateDescriptorTest {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Tests for serializer initialization
+	// ------------------------------------------------------------------------
+
+	/**
+	 * FLINK-6775, tests that the returned serializer is duplicated.
+	 * This allows to share the state descriptor across threads.
+	 */
+	@Test
+	public void testSerializerDuplication() throws Exception {
+		// we need a serializer that actually duplicates for testing (a stateful one)
+		// we use Kryo here, because it meets these conditions
+		TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+
+		TestStateDescriptor<String> descr = new TestStateDescriptor<>("foobar", statefulSerializer);
+
+		TypeSerializer<String> serializerA = descr.getSerializer();
+		TypeSerializer<String> serializerB = descr.getSerializer();
+
+		// check that the retrieved serializers are not the same
+		assertNotSame(serializerA, serializerB);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Mock implementations and test types
+	// ------------------------------------------------------------------------
 
 	private static class TestStateDescriptor<T> extends StateDescriptor<State, T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/effe7d7b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 7ee58fe..67114e5 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -29,7 +29,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
 
 /**
  * Tests for the {@link ValueStateDescriptor}.
@@ -64,26 +63,4 @@ public class ValueStateDescriptorTest extends TestLogger {
 		assertNotNull(copy.getSerializer());
 		assertEquals(serializer, copy.getSerializer());
 	}
-
-	/**
-	 * FLINK-6775.
-	 *
-	 * <p>Tests that the returned serializer is duplicated. This allows to
-	 * share the state descriptor.
-	 */
-	@SuppressWarnings("unchecked")
-	@Test
-	public void testSerializerDuplication() {
-		// we need a serializer that actually duplicates for testing (a stateful one)
-		// we use Kryo here, because it meets these conditions
-		TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
-
-		ValueStateDescriptor<String> descr = new ValueStateDescriptor<>("foobar", statefulSerializer);
-
-		TypeSerializer<String> serializerA = descr.getSerializer();
-		TypeSerializer<String> serializerB = descr.getSerializer();
-
-		// check that the retrieved serializers are not the same
-		assertNotSame(serializerA, serializerB);
-	}
 }


[7/7] flink git commit: [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

Posted by se...@apache.org.
[FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69e5d146
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69e5d146
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69e5d146

Branch: refs/heads/release-1.5
Commit: 69e5d146219bbded4bb6cc472ff015996c1aceb7
Parents: effe7d7
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 17:16:06 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:49 2018 +0100

----------------------------------------------------------------------
 .../state/AggregatingStateDescriptor.java       | 31 ---------
 .../common/state/FoldingStateDescriptor.java    | 31 ---------
 .../api/common/state/ListStateDescriptor.java   | 30 ---------
 .../api/common/state/MapStateDescriptor.java    | 29 --------
 .../common/state/ReducingStateDescriptor.java   | 30 ---------
 .../flink/api/common/state/StateDescriptor.java | 17 ++++-
 .../api/common/state/ValueStateDescriptor.java  | 31 ---------
 .../common/state/ListStateDescriptorTest.java   | 28 ++++++++
 .../common/state/MapStateDescriptorTest.java    | 29 ++++++++
 .../state/ReducingStateDescriptorTest.java      | 29 ++++++++
 .../api/common/state/StateDescriptorTest.java   | 69 ++++++++++++++++++--
 .../common/state/ValueStateDescriptorTest.java  | 28 ++++++++
 12 files changed, 193 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 6f6d2f9..8c7fed6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -111,35 +111,4 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
 	public Type getType() {
 		return Type.AGGREGATING;
 	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		else if (o != null && getClass() == o.getClass()) {
-			AggregatingStateDescriptor<?, ?, ?> that = (AggregatingStateDescriptor<?, ?, ?>) o;
-			return serializer.equals(that.serializer) && name.equals(that.name);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "AggregatingStateDescriptor{" +
-				"serializer=" + serializer +
-				", aggFunction=" + aggFunction +
-				'}';
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 261d1fe..c14e4bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -112,37 +112,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		FoldingStateDescriptor<?, ?> that = (FoldingStateDescriptor<?, ?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "FoldingStateDescriptor{" +
-				"serializer=" + serializer +
-				", initialValue=" + defaultValue +
-				", foldFunction=" + foldFunction +
-				'}';
-	}
-
-	@Override
 	public Type getType() {
 		return Type.FOLDING;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 38e5680..aa5e64b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -102,34 +102,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	public Type getType() {
 		return Type.LIST;
 	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "ListStateDescriptor{" +
-				"serializer=" + serializer +
-				'}';
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 087cb54..42b016a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -117,33 +117,4 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
 
 		return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
 	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o;
-		return serializer.equals(that.serializer) && name.equals(that.name);
-	}
-
-	@Override
-	public String toString() {
-		return "MapStateDescriptor{" +
-				"name=" + name +
-				", serializer=" + serializer +
-				'}';
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index ef483e2..0df1c2c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -98,36 +98,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "ReducingStateDescriptor{" +
-				"serializer=" + serializer +
-				", reduceFunction=" + reduceFunction +
-				'}';
-	}
-
-	@Override
 	public Type getType() {
 		return Type.REDUCING;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 574c836..9b6b51d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -273,10 +273,23 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	// ------------------------------------------------------------------------
 
 	@Override
-	public abstract int hashCode();
+	public final int hashCode() {
+		return name.hashCode() + 31 * getClass().hashCode();
+	}
 
 	@Override
-	public abstract boolean equals(Object o);
+	public final boolean equals(Object o) {
+		if (o == this) {
+			return true;
+		}
+		else if (o != null && o.getClass() == this.getClass()) {
+			final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
+			return this.name.equals(that.name);
+		}
+		else {
+			return false;
+		}
+	}
 
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index ef18d74..4d69d81 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -130,37 +130,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "ValueStateDescriptor{" +
-				"name=" + name +
-				", defaultValue=" + defaultValue +
-				", serializer=" + serializer +
-				'}';
-	}
-
-	@Override
 	public Type getType() {
 		return Type.VALUE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index b934ee0..cb6f608 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
@@ -62,6 +63,33 @@ public class ListStateDescriptorTest {
 		assertEquals(serializer, copy.getElementSerializer());
 	}
 
+	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+
+		ListStateDescriptor<String> original = new ListStateDescriptor<>(name, String.class);
+		ListStateDescriptor<String> same = new ListStateDescriptor<>(name, String.class);
+		ListStateDescriptor<String> sameBySerializer = new ListStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		ListStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
 	/**
 	 * FLINK-6775.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index 4e64c0f..069d6c2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
@@ -67,6 +68,34 @@ public class MapStateDescriptorTest {
 		assertEquals(valueSerializer, copy.getValueSerializer());
 	}
 
+	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+
+		MapStateDescriptor<String, String> original = new MapStateDescriptor<>(name, String.class, String.class);
+		MapStateDescriptor<String, String> same = new MapStateDescriptor<>(name, String.class, String.class);
+		MapStateDescriptor<String, String> sameBySerializer =
+				new MapStateDescriptor<>(name, StringSerializer.INSTANCE, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		MapStateDescriptor<String, String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
 	/**
 	 * FLINK-6775.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 5d9eba5..89aa1e6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
@@ -56,4 +57,32 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		assertNotNull(copy.getSerializer());
 		assertEquals(serializer, copy.getSerializer());
 	}
+
+	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+		final ReduceFunction<String> reducer = (a, b) -> a;
+
+		ReducingStateDescriptor<String> original = new ReducingStateDescriptor<>(name, reducer, String.class);
+		ReducingStateDescriptor<String> same = new ReducingStateDescriptor<>(name, reducer, String.class);
+		ReducingStateDescriptor<String> sameBySerializer = new ReducingStateDescriptor<>(name, reducer, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		ReducingStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index cf5327e..3958baa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -32,6 +32,7 @@ import java.io.File;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
@@ -159,6 +160,47 @@ public class StateDescriptorTest {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Test hashCode() and equals()
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testHashCodeAndEquals() throws Exception {
+		final String name = "testName";
+
+		TestStateDescriptor<String> original = new TestStateDescriptor<>(name, String.class);
+		TestStateDescriptor<String> same = new TestStateDescriptor<>(name, String.class);
+		TestStateDescriptor<String> sameBySerializer = new TestStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
+	@Test
+	public void testEqualsSameNameAndTypeDifferentClass() throws Exception {
+		final String name = "test name";
+
+		final TestStateDescriptor<String> descr1 = new TestStateDescriptor<>(name, String.class);
+		final OtherTestStateDescriptor<String> descr2 = new OtherTestStateDescriptor<>(name, String.class);
+
+		assertNotEquals(descr1, descr2);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Mock implementations and test types
 	// ------------------------------------------------------------------------
 
@@ -185,17 +227,34 @@ public class StateDescriptorTest {
 
 		@Override
 		public Type getType() {
-			throw new UnsupportedOperationException();
+			return Type.VALUE;
+		}
+	}
+
+	private static class OtherTestStateDescriptor<T> extends StateDescriptor<State, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		OtherTestStateDescriptor(String name, TypeSerializer<T> serializer) {
+			super(name, serializer, null);
+		}
+
+		OtherTestStateDescriptor(String name, TypeInformation<T> typeInfo) {
+			super(name, typeInfo, null);
+		}
+
+		OtherTestStateDescriptor(String name, Class<T> type) {
+			super(name, type, null);
 		}
 
 		@Override
-		public int hashCode() {
-			return 584523;
+		public State bind(StateBinder stateBinder) throws Exception {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public boolean equals(Object o) {
-			return o != null && o.getClass() == TestStateDescriptor.class;
+		public Type getType() {
+			return Type.VALUE;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 67114e5..3870da0 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.CommonTestUtils;
@@ -36,6 +37,33 @@ import static org.junit.Assert.assertNotNull;
 public class ValueStateDescriptorTest extends TestLogger {
 
 	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+
+		ValueStateDescriptor<String> original = new ValueStateDescriptor<>(name, String.class);
+		ValueStateDescriptor<String> same = new ValueStateDescriptor<>(name, String.class);
+		ValueStateDescriptor<String> sameBySerializer = new ValueStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		ValueStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
+	@Test
 	public void testVeryLargeDefaultValue() throws Exception {
 		// ensure that we correctly read very large data when deserializing the default value
 


[3/7] flink git commit: [hotfix] [core] Demockitofy state descriptor tests

Posted by se...@apache.org.
[hotfix] [core] Demockitofy state descriptor tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e57a1d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e57a1d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e57a1d2

Branch: refs/heads/release-1.5
Commit: 9e57a1d220fed6bb8d63166e77ae8a5619e37878
Parents: 172af4c
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 15:36:19 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:39 2018 +0100

----------------------------------------------------------------------
 .../state/AggregatingStateDescriptorTest.java   | 16 +++++---------
 .../common/state/ListStateDescriptorTest.java   | 14 +++---------
 .../common/state/MapStateDescriptorTest.java    | 23 ++++----------------
 .../state/ReducingStateDescriptorTest.java      | 22 +++++++------------
 .../common/state/ValueStateDescriptorTest.java  | 14 +++---------
 5 files changed, 23 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e57a1d2/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
index 155f23a..f62acc8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDescriptorTest.java
@@ -18,17 +18,16 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertNotSame;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link AggregatingStateDescriptor}.
@@ -41,16 +40,11 @@ public class AggregatingStateDescriptorTest extends TestLogger {
 	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
-	@SuppressWarnings("unchecked")
 	@Test
 	public void testSerializerDuplication() {
-		TypeSerializer<Long> serializer = mock(TypeSerializer.class);
-		when(serializer.duplicate()).thenAnswer(new Answer<TypeSerializer<Long>>() {
-			@Override
-			public TypeSerializer<Long> answer(InvocationOnMock invocation) throws Throwable {
-				return mock(TypeSerializer.class);
-			}
-		});
+		// we need a serializer that actually duplicates for testing (a stateful one)
+		// we use Kryo here, because it meets these conditions
+		TypeSerializer<Long> serializer = new KryoSerializer<>(Long.class, new ExecutionConfig());
 
 		AggregateFunction<Long, Long, Long> aggregatingFunction = mock(AggregateFunction.class);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e57a1d2/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index c6d086e..f45d296 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.List;
 
@@ -38,8 +36,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link ListStateDescriptor}.
@@ -121,13 +117,9 @@ public class ListStateDescriptorTest {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testSerializerDuplication() {
-		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
-		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
-			@Override
-			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
-				return mock(TypeSerializer.class);
-			}
-		});
+		// we need a serializer that actually duplicates for testing (a stateful one)
+		// we use Kryo here, because it meets these conditions
+		TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
 
 		ListStateDescriptor<String> descr = new ListStateDescriptor<>("foobar", statefulSerializer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e57a1d2/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index e2aa940..2151834 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.Map;
 
@@ -39,8 +37,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link MapStateDescriptor}.
@@ -129,23 +125,12 @@ public class MapStateDescriptorTest {
 	 * <p>Tests that the returned serializer is duplicated. This allows to
 	 * share the state descriptor.
 	 */
-	@SuppressWarnings("unchecked")
 	@Test
 	public void testSerializerDuplication() {
-		TypeSerializer<String> keySerializer = mock(TypeSerializer.class);
-		TypeSerializer<Long> valueSerializer = mock(TypeSerializer.class);
-		when(keySerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
-			@Override
-			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
-				return mock(TypeSerializer.class);
-			}
-		});
-		when(valueSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<Long>>() {
-			@Override
-			public TypeSerializer<Long> answer(InvocationOnMock invocation) throws Throwable {
-				return mock(TypeSerializer.class);
-			}
-		});
+		// we need a serializer that actually duplicates for testing (a stateful one)
+		// we use Kryo here, because it meets these conditions
+		TypeSerializer<String> keySerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+		TypeSerializer<Long> valueSerializer = new KryoSerializer<>(Long.class, new ExecutionConfig());
 
 		MapStateDescriptor<String, Long> descr = new MapStateDescriptor<>("foobar", keySerializer, valueSerializer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e57a1d2/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index ef39f14..1e21a78 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -38,7 +36,6 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link ReducingStateDescriptor}.
@@ -118,17 +115,14 @@ public class ReducingStateDescriptorTest extends TestLogger {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testSerializerDuplication() {
-		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
-		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
-			@Override
-			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
-				return mock(TypeSerializer.class);
-			}
-		});
-
-		ReduceFunction<String> reducer = mock(ReduceFunction.class);
-
-		ReducingStateDescriptor<String> descr = new ReducingStateDescriptor<>("foobar", reducer, statefulSerializer);
+		// we need a serializer that actually duplicates for testing (a stateful one)
+		// we use Kryo here, because it meets these conditions
+		TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
+
+		ReducingStateDescriptor<String> descr = new ReducingStateDescriptor<>(
+				"foobar",
+				(a, b) -> a,
+				statefulSerializer);
 
 		TypeSerializer<String> serializerA = descr.getSerializer();
 		TypeSerializer<String> serializerB = descr.getSerializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/9e57a1d2/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index b43e5ad..f3b9eee 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -29,8 +29,6 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.File;
 
@@ -39,8 +37,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link ValueStateDescriptor}.
@@ -149,13 +145,9 @@ public class ValueStateDescriptorTest extends TestLogger {
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testSerializerDuplication() {
-		TypeSerializer<String> statefulSerializer = mock(TypeSerializer.class);
-		when(statefulSerializer.duplicate()).thenAnswer(new Answer<TypeSerializer<String>>() {
-			@Override
-			public TypeSerializer<String> answer(InvocationOnMock invocation) throws Throwable {
-				return mock(TypeSerializer.class);
-			}
-		});
+		// we need a serializer that actually duplicates for testing (a stateful one)
+		// we use Kryo here, because it meets these conditions
+		TypeSerializer<String> statefulSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
 
 		ValueStateDescriptor<String> descr = new ValueStateDescriptor<>("foobar", statefulSerializer);
 


[4/7] flink git commit: [hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects.

Posted by se...@apache.org.
[hotfix] [core] Make State Descriptors consistently use Preconditions instead of Objects.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72c96157
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72c96157
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72c96157

Branch: refs/heads/release-1.5
Commit: 72c96157ed18d9203a34de5228e1e57708b3aab6
Parents: 9e57a1d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 15:44:27 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:42 2018 +0100

----------------------------------------------------------------------
 .../api/common/state/ReducingStateDescriptor.java     |  8 ++++----
 .../flink/api/common/state/StateDescriptor.java       | 14 +++++++-------
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72c96157/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index a14b4bd..ef483e2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * {@link StateDescriptor} for {@link ReducingState}. This can be used to create partitioned
@@ -52,7 +52,7 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	 */
 	public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
 		super(name, typeClass, null);
-		this.reduceFunction = requireNonNull(reduceFunction);
+		this.reduceFunction = checkNotNull(reduceFunction);
 
 		if (reduceFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("ReduceFunction of ReducingState can not be a RichFunction.");
@@ -68,7 +68,7 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	 */
 	public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {
 		super(name, typeInfo, null);
-		this.reduceFunction = requireNonNull(reduceFunction);
+		this.reduceFunction = checkNotNull(reduceFunction);
 	}
 
 	/**
@@ -80,7 +80,7 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	 */
 	public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeSerializer<T> typeSerializer) {
 		super(name, typeSerializer, null);
-		this.reduceFunction = requireNonNull(reduceFunction);
+		this.reduceFunction = checkNotNull(reduceFunction);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/72c96157/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 841f710..5ec59e4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -34,7 +34,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
-import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
@@ -100,8 +100,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 *                     a value before.
 	 */
 	protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) {
-		this.name = requireNonNull(name, "name must not be null");
-		this.serializer = requireNonNull(serializer, "serializer must not be null");
+		this.name = checkNotNull(name, "name must not be null");
+		this.serializer = checkNotNull(serializer, "serializer must not be null");
 		this.defaultValue = defaultValue;
 	}
 
@@ -114,8 +114,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 *                     a value before.
 	 */
 	protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
-		this.name = requireNonNull(name, "name must not be null");
-		this.typeInfo = requireNonNull(typeInfo, "type information must not be null");
+		this.name = checkNotNull(name, "name must not be null");
+		this.typeInfo = checkNotNull(typeInfo, "type information must not be null");
 		this.defaultValue = defaultValue;
 	}
 
@@ -131,8 +131,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 *                     a value before.
 	 */
 	protected StateDescriptor(String name, Class<T> type, T defaultValue) {
-		this.name = requireNonNull(name, "name must not be null");
-		requireNonNull(type, "type class must not be null");
+		this.name = checkNotNull(name, "name must not be null");
+		checkNotNull(type, "type class must not be null");
 
 		try {
 			this.typeInfo = TypeExtractor.createTypeInfo(type);


[5/7] flink git commit: [FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization.

Posted by se...@apache.org.
[FLINK-9034] [core] StateDescriptor does not throw away TypeInformation upon serialization.

Throwing away TypeInformation upon serialization was previously done because the type
information was not serializable. Now that it is serializable, we can (and should) keep
it to provide consistent user experience, where all serializers respect the ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/586eb100
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/586eb100
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/586eb100

Branch: refs/heads/release-1.5
Commit: 586eb10084f56524f0bf14fb90c16e696b6a941f
Parents: 72c9615
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 16:22:12 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:45 2018 +0100

----------------------------------------------------------------------
 .../flink/api/common/state/StateDescriptor.java |  41 +++--
 .../common/state/ListStateDescriptorTest.java   |  48 +-----
 .../common/state/MapStateDescriptorTest.java    |  54 +-----
 .../state/ReducingStateDescriptorTest.java      |  54 +-----
 .../api/common/state/StateDescriptorTest.java   | 171 +++++++++++++++++++
 .../common/state/ValueStateDescriptorTest.java  |  71 --------
 6 files changed, 200 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/586eb100/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 5ec59e4..574c836 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -35,6 +37,7 @@ import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
@@ -76,19 +79,24 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	protected final String name;
 
 	/** The serializer for the type. May be eagerly initialized in the constructor,
-	 * or lazily once the type is serialized or an ExecutionConfig is provided. */
+	 * or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method
+	 * is called. */
+	@Nullable
 	protected TypeSerializer<T> serializer;
 
+	/** The type information describing the value type. Only used to if the serializer
+	 * is created lazily. */
+	@Nullable
+	private TypeInformation<T> typeInfo;
+
 	/** Name for queries against state created from this StateDescriptor. */
+	@Nullable
 	private String queryableStateName;
 
 	/** The default value returned by the state when no other value is bound to a key. */
+	@Nullable
 	protected transient T defaultValue;
 
-	/** The type information describing the value type. Only used to lazily create the serializer
-	 * and dropped during serialization */
-	private transient TypeInformation<T> typeInfo;
-
 	// ------------------------------------------------------------------------
 
 	/**
@@ -99,7 +107,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 * @param defaultValue The default value that will be set when requesting state without setting
 	 *                     a value before.
 	 */
-	protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) {
+	protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) {
 		this.name = checkNotNull(name, "name must not be null");
 		this.serializer = checkNotNull(serializer, "serializer must not be null");
 		this.defaultValue = defaultValue;
@@ -113,7 +121,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 * @param defaultValue The default value that will be set when requesting state without setting
 	 *                     a value before.
 	 */
-	protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
+	protected StateDescriptor(String name, TypeInformation<T> typeInfo, @Nullable T defaultValue) {
 		this.name = checkNotNull(name, "name must not be null");
 		this.typeInfo = checkNotNull(typeInfo, "type information must not be null");
 		this.defaultValue = defaultValue;
@@ -130,7 +138,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 * @param defaultValue The default value that will be set when requesting state without setting
 	 *                     a value before.
 	 */
-	protected StateDescriptor(String name, Class<T> type, T defaultValue) {
+	protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
 		this.name = checkNotNull(name, "name must not be null");
 		checkNotNull(type, "type class must not be null");
 
@@ -208,6 +216,7 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 *
 	 * @return Queryable state name or <code>null</code> if not set.
 	 */
+	@Nullable
 	public String getQueryableStateName() {
 		return queryableStateName;
 	}
@@ -249,12 +258,13 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	 */
 	public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
 		if (serializer == null) {
-			if (typeInfo != null) {
-				serializer = typeInfo.createSerializer(executionConfig);
-			} else {
-				throw new IllegalStateException(
-						"Cannot initialize serializer after TypeInformation was dropped during serialization");
-			}
+			checkState(typeInfo != null, "no serializer and no type info");
+
+			// instantiate the serializer
+			serializer = typeInfo.createSerializer(executionConfig);
+
+			// we can drop the type info now, no longer needed
+			typeInfo  = null;
 		}
 	}
 
@@ -285,9 +295,6 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	// ------------------------------------------------------------------------
 
 	private void writeObject(final ObjectOutputStream out) throws IOException {
-		// make sure we have a serializer before the type information gets lost
-		initializeSerializerUnlessSet(new ExecutionConfig());
-
 		// write all the non-transient fields
 		out.defaultWriteObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/586eb100/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index f45d296..e7e33e7 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -19,12 +19,9 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
@@ -35,7 +32,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link ListStateDescriptor}.
@@ -43,7 +39,7 @@ import static org.junit.Assert.fail;
 public class ListStateDescriptorTest {
 
 	@Test
-	public void testValueStateDescriptorEagerSerializer() throws Exception {
+	public void testListStateDescriptor() throws Exception {
 
 		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
 
@@ -66,48 +62,6 @@ public class ListStateDescriptorTest {
 		assertEquals(serializer, copy.getElementSerializer());
 	}
 
-	@Test
-	public void testValueStateDescriptorLazySerializer() throws Exception {
-		// some different registered value
-		ExecutionConfig cfg = new ExecutionConfig();
-		cfg.registerKryoType(TaskInfo.class);
-
-		ListStateDescriptor<Path> descr =
-				new ListStateDescriptor<>("testName", Path.class);
-
-		try {
-			descr.getSerializer();
-			fail("should cause an exception");
-		} catch (IllegalStateException ignored) {}
-
-		descr.initializeSerializerUnlessSet(cfg);
-
-		assertNotNull(descr.getSerializer());
-		assertTrue(descr.getSerializer() instanceof ListSerializer);
-
-		assertNotNull(descr.getElementSerializer());
-		assertTrue(descr.getElementSerializer() instanceof KryoSerializer);
-
-		assertTrue(((KryoSerializer<?>) descr.getElementSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
-	}
-
-	@Test
-	public void testValueStateDescriptorAutoSerializer() throws Exception {
-
-		ListStateDescriptor<String> descr =
-				new ListStateDescriptor<>("testName", String.class);
-
-		ListStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
-
-		assertEquals("testName", copy.getName());
-
-		assertNotNull(copy.getSerializer());
-		assertTrue(copy.getSerializer() instanceof ListSerializer);
-
-		assertNotNull(copy.getElementSerializer());
-		assertEquals(StringSerializer.INSTANCE, copy.getElementSerializer());
-	}
-
 	/**
 	 * FLINK-6775.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/586eb100/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index 2151834..4e64c0f 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -19,13 +19,9 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
 import org.junit.Test;
@@ -36,7 +32,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link MapStateDescriptor}.
@@ -44,7 +39,7 @@ import static org.junit.Assert.fail;
 public class MapStateDescriptorTest {
 
 	@Test
-	public void testMapStateDescriptorEagerSerializer() throws Exception {
+	public void testMapStateDescriptor() throws Exception {
 
 		TypeSerializer<Integer> keySerializer = new KryoSerializer<>(Integer.class, new ExecutionConfig());
 		TypeSerializer<String> valueSerializer = new KryoSerializer<>(String.class, new ExecutionConfig());
@@ -72,53 +67,6 @@ public class MapStateDescriptorTest {
 		assertEquals(valueSerializer, copy.getValueSerializer());
 	}
 
-	@Test
-	public void testMapStateDescriptorLazySerializer() throws Exception {
-		// some different registered value
-		ExecutionConfig cfg = new ExecutionConfig();
-		cfg.registerKryoType(TaskInfo.class);
-
-		MapStateDescriptor<Path, String> descr =
-				new MapStateDescriptor<>("testName", Path.class, String.class);
-
-		try {
-			descr.getSerializer();
-			fail("should cause an exception");
-		} catch (IllegalStateException ignored) {}
-
-		descr.initializeSerializerUnlessSet(cfg);
-
-		assertNotNull(descr.getSerializer());
-		assertTrue(descr.getSerializer() instanceof MapSerializer);
-
-		assertNotNull(descr.getKeySerializer());
-		assertTrue(descr.getKeySerializer() instanceof KryoSerializer);
-
-		assertTrue(((KryoSerializer<?>) descr.getKeySerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
-
-		assertNotNull(descr.getValueSerializer());
-		assertTrue(descr.getValueSerializer() instanceof StringSerializer);
-	}
-
-	@Test
-	public void testMapStateDescriptorAutoSerializer() throws Exception {
-
-		MapStateDescriptor<String, Long> descr =
-				new MapStateDescriptor<>("testName", String.class, Long.class);
-
-		MapStateDescriptor<String, Long> copy = CommonTestUtils.createCopySerializable(descr);
-
-		assertEquals("testName", copy.getName());
-
-		assertNotNull(copy.getSerializer());
-		assertTrue(copy.getSerializer() instanceof MapSerializer);
-
-		assertNotNull(copy.getKeySerializer());
-		assertEquals(StringSerializer.INSTANCE, copy.getKeySerializer());
-		assertNotNull(copy.getValueSerializer());
-		assertEquals(LongSerializer.INSTANCE, copy.getValueSerializer());
-	}
-
 	/**
 	 * FLINK-6775.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/586eb100/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 1e21a78..81b7c38 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -19,12 +19,9 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -33,9 +30,6 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the {@link ReducingStateDescriptor}.
@@ -43,10 +37,9 @@ import static org.mockito.Mockito.mock;
 public class ReducingStateDescriptorTest extends TestLogger {
 
 	@Test
-	public void testValueStateDescriptorEagerSerializer() throws Exception {
+	public void testReducingStateDescriptor() throws Exception {
 
-		@SuppressWarnings("unchecked")
-		ReduceFunction<String> reducer = mock(ReduceFunction.class);
+		ReduceFunction<String> reducer = (a, b) -> a;
 
 		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
 
@@ -56,6 +49,7 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		assertEquals("testName", descr.getName());
 		assertNotNull(descr.getSerializer());
 		assertEquals(serializer, descr.getSerializer());
+		assertEquals(reducer, descr.getReduceFunction());
 
 		ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
 
@@ -64,48 +58,6 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		assertEquals(serializer, copy.getSerializer());
 	}
 
-	@Test
-	public void testValueStateDescriptorLazySerializer() throws Exception {
-
-		@SuppressWarnings("unchecked")
-		ReduceFunction<Path> reducer = mock(ReduceFunction.class);
-
-		// some different registered value
-		ExecutionConfig cfg = new ExecutionConfig();
-		cfg.registerKryoType(TaskInfo.class);
-
-		ReducingStateDescriptor<Path> descr =
-				new ReducingStateDescriptor<>("testName", reducer, Path.class);
-
-		try {
-			descr.getSerializer();
-			fail("should cause an exception");
-		} catch (IllegalStateException ignored) {}
-
-		descr.initializeSerializerUnlessSet(cfg);
-
-		assertNotNull(descr.getSerializer());
-		assertTrue(descr.getSerializer() instanceof KryoSerializer);
-
-		assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
-	}
-
-	@Test
-	public void testValueStateDescriptorAutoSerializer() throws Exception {
-
-		@SuppressWarnings("unchecked")
-		ReduceFunction<String> reducer = mock(ReduceFunction.class);
-
-		ReducingStateDescriptor<String> descr =
-				new ReducingStateDescriptor<>("testName", reducer, String.class);
-
-		ReducingStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
-
-		assertEquals("testName", copy.getName());
-		assertNotNull(copy.getSerializer());
-		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
-	}
-
 	/**
 	 * FLINK-6775.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/586eb100/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
new file mode 100644
index 0000000..59293f4
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the common/shared functionality of {@link StateDescriptor}.
+ */
+public class StateDescriptorTest {
+
+	@Test
+	public void testInitializeWithSerializer() throws Exception {
+		final TypeSerializer<String> serializer = StringSerializer.INSTANCE;
+		final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", serializer);
+
+		assertTrue(descr.isSerializerInitialized());
+		assertNotNull(descr.getSerializer());
+		assertTrue(descr.getSerializer() instanceof StringSerializer);
+
+		// this should not have any effect
+		descr.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertTrue(descr.isSerializerInitialized());
+		assertNotNull(descr.getSerializer());
+		assertTrue(descr.getSerializer() instanceof StringSerializer);
+
+		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr);
+		assertTrue(clone.isSerializerInitialized());
+		assertNotNull(clone.getSerializer());
+		assertTrue(clone.getSerializer() instanceof StringSerializer);
+	}
+
+	@Test
+	public void testInitializeSerializerBeforeSerialization() throws Exception {
+		final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class);
+
+		assertFalse(descr.isSerializerInitialized());
+		try {
+			descr.getSerializer();
+			fail("should fail with an exception");
+		} catch (IllegalStateException ignored) {}
+
+		descr.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		assertTrue(descr.isSerializerInitialized());
+		assertNotNull(descr.getSerializer());
+		assertTrue(descr.getSerializer() instanceof StringSerializer);
+
+		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr);
+
+		assertTrue(clone.isSerializerInitialized());
+		assertNotNull(clone.getSerializer());
+		assertTrue(clone.getSerializer() instanceof StringSerializer);
+	}
+
+	@Test
+	public void testInitializeSerializerAfterSerialization() throws Exception {
+		final TestStateDescriptor<String> descr = new TestStateDescriptor<>("test", String.class);
+
+		assertFalse(descr.isSerializerInitialized());
+		try {
+			descr.getSerializer();
+			fail("should fail with an exception");
+		} catch (IllegalStateException ignored) {}
+
+		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(descr);
+
+		assertFalse(clone.isSerializerInitialized());
+		try {
+			clone.getSerializer();
+			fail("should fail with an exception");
+		} catch (IllegalStateException ignored) {}
+
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+
+		assertTrue(clone.isSerializerInitialized());
+		assertNotNull(clone.getSerializer());
+		assertTrue(clone.getSerializer() instanceof StringSerializer);
+	}
+
+	@Test
+	public void testInitializeSerializerAfterSerializationWithCustomConfig() throws Exception {
+		// guard our test assumptions.
+		assertEquals("broken test assumption", -1,
+				new KryoSerializer<>(String.class, new ExecutionConfig()).getKryo()
+						.getRegistration(File.class).getId());
+
+		final ExecutionConfig config = new ExecutionConfig();
+		config.registerKryoType(File.class);
+
+		final TestStateDescriptor<Path> original = new TestStateDescriptor<>("test", Path.class);
+		TestStateDescriptor<Path> clone = CommonTestUtils.createCopySerializable(original);
+
+		clone.initializeSerializerUnlessSet(config);
+
+		// serialized one (later initialized) carries the registration
+		assertTrue(((KryoSerializer<?>) clone.getSerializer()).getKryo()
+				.getRegistration(File.class).getId() > 0);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class TestStateDescriptor<T> extends StateDescriptor<State, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		TestStateDescriptor(String name, TypeSerializer<T> serializer) {
+			super(name, serializer, null);
+		}
+
+		TestStateDescriptor(String name, TypeInformation<T> typeInfo) {
+			super(name, typeInfo, null);
+		}
+
+		TestStateDescriptor(String name, Class<T> type) {
+			super(name, type, null);
+		}
+
+		@Override
+		public State bind(StateBinder stateBinder) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Type getType() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public int hashCode() {
+			return 584523;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			return o != null && o.getClass() == TestStateDescriptor.class;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/586eb100/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index f3b9eee..7ee58fe 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -19,24 +19,17 @@
 package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.io.File;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link ValueStateDescriptor}.
@@ -44,70 +37,6 @@ import static org.junit.Assert.fail;
 public class ValueStateDescriptorTest extends TestLogger {
 
 	@Test
-	public void testValueStateDescriptorEagerSerializer() throws Exception {
-
-		TypeSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
-		String defaultValue = "le-value-default";
-
-		ValueStateDescriptor<String> descr =
-				new ValueStateDescriptor<>("testName", serializer, defaultValue);
-
-		assertEquals("testName", descr.getName());
-		assertEquals(defaultValue, descr.getDefaultValue());
-		assertNotNull(descr.getSerializer());
-		assertEquals(serializer, descr.getSerializer());
-
-		ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
-
-		assertEquals("testName", copy.getName());
-		assertEquals(defaultValue, copy.getDefaultValue());
-		assertNotNull(copy.getSerializer());
-		assertEquals(serializer, copy.getSerializer());
-	}
-
-	@Test
-	public void testValueStateDescriptorLazySerializer() throws Exception {
-
-		// some default value that goes to the generic serializer
-		Path defaultValue = new Path(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).toURI());
-
-		// some different registered value
-		ExecutionConfig cfg = new ExecutionConfig();
-		cfg.registerKryoType(TaskInfo.class);
-
-		ValueStateDescriptor<Path> descr =
-				new ValueStateDescriptor<>("testName", Path.class, defaultValue);
-
-		try {
-			descr.getSerializer();
-			fail("should cause an exception");
-		} catch (IllegalStateException ignored) {}
-
-		descr.initializeSerializerUnlessSet(cfg);
-
-		assertNotNull(descr.getSerializer());
-		assertTrue(descr.getSerializer() instanceof KryoSerializer);
-
-		assertTrue(((KryoSerializer<?>) descr.getSerializer()).getKryo().getRegistration(TaskInfo.class).getId() > 0);
-	}
-
-	@Test
-	public void testValueStateDescriptorAutoSerializer() throws Exception {
-
-		String defaultValue = "le-value-default";
-
-		ValueStateDescriptor<String> descr =
-				new ValueStateDescriptor<>("testName", String.class, defaultValue);
-
-		ValueStateDescriptor<String> copy = CommonTestUtils.createCopySerializable(descr);
-
-		assertEquals("testName", copy.getName());
-		assertEquals(defaultValue, copy.getDefaultValue());
-		assertNotNull(copy.getSerializer());
-		assertEquals(StringSerializer.INSTANCE, copy.getSerializer());
-	}
-
-	@Test
 	public void testVeryLargeDefaultValue() throws Exception {
 		// ensure that we correctly read very large data when deserializing the default value
 


[2/7] flink git commit: [hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

Posted by se...@apache.org.
[hotfix] [core] Add missing serialVersionUID to MapStateDescriptor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/172af4c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/172af4c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/172af4c9

Branch: refs/heads/release-1.5
Commit: 172af4c96eb5ab6a69ca71471551f42da548d713
Parents: b9aec0e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 15:29:12 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:37 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/common/state/MapStateDescriptor.java | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/172af4c9/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 2e7ac98..087cb54 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -42,6 +42,8 @@ import java.util.Map;
 @PublicEvolving
 public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
 
+	private static final long serialVersionUID = 1L;
+
 	/**
 	 * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
 	 *