You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/03/22 15:22:54 UTC

[7/7] flink git commit: [FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling

[FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69e5d146
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69e5d146
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69e5d146

Branch: refs/heads/release-1.5
Commit: 69e5d146219bbded4bb6cc472ff015996c1aceb7
Parents: effe7d7
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 17:16:06 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:49 2018 +0100

----------------------------------------------------------------------
 .../state/AggregatingStateDescriptor.java       | 31 ---------
 .../common/state/FoldingStateDescriptor.java    | 31 ---------
 .../api/common/state/ListStateDescriptor.java   | 30 ---------
 .../api/common/state/MapStateDescriptor.java    | 29 --------
 .../common/state/ReducingStateDescriptor.java   | 30 ---------
 .../flink/api/common/state/StateDescriptor.java | 17 ++++-
 .../api/common/state/ValueStateDescriptor.java  | 31 ---------
 .../common/state/ListStateDescriptorTest.java   | 28 ++++++++
 .../common/state/MapStateDescriptorTest.java    | 29 ++++++++
 .../state/ReducingStateDescriptorTest.java      | 29 ++++++++
 .../api/common/state/StateDescriptorTest.java   | 69 ++++++++++++++++++--
 .../common/state/ValueStateDescriptorTest.java  | 28 ++++++++
 12 files changed, 193 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 6f6d2f9..8c7fed6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -111,35 +111,4 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
 	public Type getType() {
 		return Type.AGGREGATING;
 	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		else if (o != null && getClass() == o.getClass()) {
-			AggregatingStateDescriptor<?, ?, ?> that = (AggregatingStateDescriptor<?, ?, ?>) o;
-			return serializer.equals(that.serializer) && name.equals(that.name);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "AggregatingStateDescriptor{" +
-				"serializer=" + serializer +
-				", aggFunction=" + aggFunction +
-				'}';
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 261d1fe..c14e4bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -112,37 +112,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		FoldingStateDescriptor<?, ?> that = (FoldingStateDescriptor<?, ?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "FoldingStateDescriptor{" +
-				"serializer=" + serializer +
-				", initialValue=" + defaultValue +
-				", foldFunction=" + foldFunction +
-				'}';
-	}
-
-	@Override
 	public Type getType() {
 		return Type.FOLDING;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 38e5680..aa5e64b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -102,34 +102,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
 	public Type getType() {
 		return Type.LIST;
 	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "ListStateDescriptor{" +
-				"serializer=" + serializer +
-				'}';
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 087cb54..42b016a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -117,33 +117,4 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
 
 		return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
 	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o;
-		return serializer.equals(that.serializer) && name.equals(that.name);
-	}
-
-	@Override
-	public String toString() {
-		return "MapStateDescriptor{" +
-				"name=" + name +
-				", serializer=" + serializer +
-				'}';
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index ef483e2..0df1c2c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -98,36 +98,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "ReducingStateDescriptor{" +
-				"serializer=" + serializer +
-				", reduceFunction=" + reduceFunction +
-				'}';
-	}
-
-	@Override
 	public Type getType() {
 		return Type.REDUCING;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 574c836..9b6b51d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -273,10 +273,23 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
 	// ------------------------------------------------------------------------
 
 	@Override
-	public abstract int hashCode();
+	public final int hashCode() {
+		return name.hashCode() + 31 * getClass().hashCode();
+	}
 
 	@Override
-	public abstract boolean equals(Object o);
+	public final boolean equals(Object o) {
+		if (o == this) {
+			return true;
+		}
+		else if (o != null && o.getClass() == this.getClass()) {
+			final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
+			return this.name.equals(that.name);
+		}
+		else {
+			return false;
+		}
+	}
 
 	@Override
 	public String toString() {

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index ef18d74..4d69d81 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -130,37 +130,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
 	}
 
 	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
-
-		return serializer.equals(that.serializer) && name.equals(that.name);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = serializer.hashCode();
-		result = 31 * result + name.hashCode();
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "ValueStateDescriptor{" +
-				"name=" + name +
-				", defaultValue=" + defaultValue +
-				", serializer=" + serializer +
-				'}';
-	}
-
-	@Override
 	public Type getType() {
 		return Type.VALUE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index b934ee0..cb6f608 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
@@ -62,6 +63,33 @@ public class ListStateDescriptorTest {
 		assertEquals(serializer, copy.getElementSerializer());
 	}
 
+	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+
+		ListStateDescriptor<String> original = new ListStateDescriptor<>(name, String.class);
+		ListStateDescriptor<String> same = new ListStateDescriptor<>(name, String.class);
+		ListStateDescriptor<String> sameBySerializer = new ListStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		ListStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
 	/**
 	 * FLINK-6775.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index 4e64c0f..069d6c2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 
@@ -67,6 +68,34 @@ public class MapStateDescriptorTest {
 		assertEquals(valueSerializer, copy.getValueSerializer());
 	}
 
+	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+
+		MapStateDescriptor<String, String> original = new MapStateDescriptor<>(name, String.class, String.class);
+		MapStateDescriptor<String, String> same = new MapStateDescriptor<>(name, String.class, String.class);
+		MapStateDescriptor<String, String> sameBySerializer =
+				new MapStateDescriptor<>(name, StringSerializer.INSTANCE, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		MapStateDescriptor<String, String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
 	/**
 	 * FLINK-6775.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 5d9eba5..89aa1e6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
@@ -56,4 +57,32 @@ public class ReducingStateDescriptorTest extends TestLogger {
 		assertNotNull(copy.getSerializer());
 		assertEquals(serializer, copy.getSerializer());
 	}
+
+	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+		final ReduceFunction<String> reducer = (a, b) -> a;
+
+		ReducingStateDescriptor<String> original = new ReducingStateDescriptor<>(name, reducer, String.class);
+		ReducingStateDescriptor<String> same = new ReducingStateDescriptor<>(name, reducer, String.class);
+		ReducingStateDescriptor<String> sameBySerializer = new ReducingStateDescriptor<>(name, reducer, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		ReducingStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index cf5327e..3958baa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -32,6 +32,7 @@ import java.io.File;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
@@ -159,6 +160,47 @@ public class StateDescriptorTest {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Test hashCode() and equals()
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testHashCodeAndEquals() throws Exception {
+		final String name = "testName";
+
+		TestStateDescriptor<String> original = new TestStateDescriptor<>(name, String.class);
+		TestStateDescriptor<String> same = new TestStateDescriptor<>(name, String.class);
+		TestStateDescriptor<String> sameBySerializer = new TestStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
+	@Test
+	public void testEqualsSameNameAndTypeDifferentClass() throws Exception {
+		final String name = "test name";
+
+		final TestStateDescriptor<String> descr1 = new TestStateDescriptor<>(name, String.class);
+		final OtherTestStateDescriptor<String> descr2 = new OtherTestStateDescriptor<>(name, String.class);
+
+		assertNotEquals(descr1, descr2);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Mock implementations and test types
 	// ------------------------------------------------------------------------
 
@@ -185,17 +227,34 @@ public class StateDescriptorTest {
 
 		@Override
 		public Type getType() {
-			throw new UnsupportedOperationException();
+			return Type.VALUE;
+		}
+	}
+
+	private static class OtherTestStateDescriptor<T> extends StateDescriptor<State, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		OtherTestStateDescriptor(String name, TypeSerializer<T> serializer) {
+			super(name, serializer, null);
+		}
+
+		OtherTestStateDescriptor(String name, TypeInformation<T> typeInfo) {
+			super(name, typeInfo, null);
+		}
+
+		OtherTestStateDescriptor(String name, Class<T> type) {
+			super(name, type, null);
 		}
 
 		@Override
-		public int hashCode() {
-			return 584523;
+		public State bind(StateBinder stateBinder) throws Exception {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public boolean equals(Object o) {
-			return o != null && o.getClass() == TestStateDescriptor.class;
+		public Type getType() {
+			return Type.VALUE;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 67114e5..3870da0 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.testutils.CommonTestUtils;
@@ -36,6 +37,33 @@ import static org.junit.Assert.assertNotNull;
 public class ValueStateDescriptorTest extends TestLogger {
 
 	@Test
+	public void testHashCodeEquals() throws Exception {
+		final String name = "testName";
+
+		ValueStateDescriptor<String> original = new ValueStateDescriptor<>(name, String.class);
+		ValueStateDescriptor<String> same = new ValueStateDescriptor<>(name, String.class);
+		ValueStateDescriptor<String> sameBySerializer = new ValueStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+		// test that hashCode() works on state descriptors with initialized and uninitialized serializers
+		assertEquals(original.hashCode(), same.hashCode());
+		assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+		assertEquals(original, same);
+		assertEquals(original, sameBySerializer);
+
+		// equality with a clone
+		ValueStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+		assertEquals(original, clone);
+
+		// equality with an initialized
+		clone.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, clone);
+
+		original.initializeSerializerUnlessSet(new ExecutionConfig());
+		assertEquals(original, same);
+	}
+
+	@Test
 	public void testVeryLargeDefaultValue() throws Exception {
 		// ensure that we correctly read very large data when deserializing the default value