You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/03/22 15:22:54 UTC
[7/7] flink git commit: [FLINK-9035] [core] Fix state descriptor
equals() and hashCode() handling
[FLINK-9035] [core] Fix state descriptor equals() and hashCode() handling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69e5d146
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69e5d146
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69e5d146
Branch: refs/heads/release-1.5
Commit: 69e5d146219bbded4bb6cc472ff015996c1aceb7
Parents: effe7d7
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 20 17:16:06 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 22 16:04:49 2018 +0100
----------------------------------------------------------------------
.../state/AggregatingStateDescriptor.java | 31 ---------
.../common/state/FoldingStateDescriptor.java | 31 ---------
.../api/common/state/ListStateDescriptor.java | 30 ---------
.../api/common/state/MapStateDescriptor.java | 29 --------
.../common/state/ReducingStateDescriptor.java | 30 ---------
.../flink/api/common/state/StateDescriptor.java | 17 ++++-
.../api/common/state/ValueStateDescriptor.java | 31 ---------
.../common/state/ListStateDescriptorTest.java | 28 ++++++++
.../common/state/MapStateDescriptorTest.java | 29 ++++++++
.../state/ReducingStateDescriptorTest.java | 29 ++++++++
.../api/common/state/StateDescriptorTest.java | 69 ++++++++++++++++++--
.../common/state/ValueStateDescriptorTest.java | 28 ++++++++
12 files changed, 193 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
index 6f6d2f9..8c7fed6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AggregatingStateDescriptor.java
@@ -111,35 +111,4 @@ public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<Ag
public Type getType() {
return Type.AGGREGATING;
}
-
- // ------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- else if (o != null && getClass() == o.getClass()) {
- AggregatingStateDescriptor<?, ?, ?> that = (AggregatingStateDescriptor<?, ?, ?>) o;
- return serializer.equals(that.serializer) && name.equals(that.name);
- }
- else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- int result = serializer.hashCode();
- result = 31 * result + name.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "AggregatingStateDescriptor{" +
- "serializer=" + serializer +
- ", aggFunction=" + aggFunction +
- '}';
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 261d1fe..c14e4bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -112,37 +112,6 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FoldingStateDescriptor<?, ?> that = (FoldingStateDescriptor<?, ?>) o;
-
- return serializer.equals(that.serializer) && name.equals(that.name);
-
- }
-
- @Override
- public int hashCode() {
- int result = serializer.hashCode();
- result = 31 * result + name.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "FoldingStateDescriptor{" +
- "serializer=" + serializer +
- ", initialValue=" + defaultValue +
- ", foldFunction=" + foldFunction +
- '}';
- }
-
- @Override
public Type getType() {
return Type.FOLDING;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 38e5680..aa5e64b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -102,34 +102,4 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T
public Type getType() {
return Type.LIST;
}
-
- // ------------------------------------------------------------------------
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- final ListStateDescriptor<?> that = (ListStateDescriptor<?>) o;
- return serializer.equals(that.serializer) && name.equals(that.name);
-
- }
-
- @Override
- public int hashCode() {
- int result = serializer.hashCode();
- result = 31 * result + name.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "ListStateDescriptor{" +
- "serializer=" + serializer +
- '}';
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 087cb54..42b016a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -117,33 +117,4 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>
return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
}
-
- @Override
- public int hashCode() {
- int result = serializer.hashCode();
- result = 31 * result + name.hashCode();
- return result;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- MapStateDescriptor<?, ?> that = (MapStateDescriptor<?, ?>) o;
- return serializer.equals(that.serializer) && name.equals(that.name);
- }
-
- @Override
- public String toString() {
- return "MapStateDescriptor{" +
- "name=" + name +
- ", serializer=" + serializer +
- '}';
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index ef483e2..0df1c2c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -98,36 +98,6 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ReducingStateDescriptor<?> that = (ReducingStateDescriptor<?>) o;
-
- return serializer.equals(that.serializer) && name.equals(that.name);
-
- }
-
- @Override
- public int hashCode() {
- int result = serializer.hashCode();
- result = 31 * result + name.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "ReducingStateDescriptor{" +
- "serializer=" + serializer +
- ", reduceFunction=" + reduceFunction +
- '}';
- }
-
- @Override
public Type getType() {
return Type.REDUCING;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 574c836..9b6b51d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -273,10 +273,23 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
// ------------------------------------------------------------------------
@Override
- public abstract int hashCode();
+ public final int hashCode() {
+ return name.hashCode() + 31 * getClass().hashCode();
+ }
@Override
- public abstract boolean equals(Object o);
+ public final boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ else if (o != null && o.getClass() == this.getClass()) {
+ final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
+ return this.name.equals(that.name);
+ }
+ else {
+ return false;
+ }
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index ef18d74..4d69d81 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -130,37 +130,6 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- ValueStateDescriptor<?> that = (ValueStateDescriptor<?>) o;
-
- return serializer.equals(that.serializer) && name.equals(that.name);
-
- }
-
- @Override
- public int hashCode() {
- int result = serializer.hashCode();
- result = 31 * result + name.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- return "ValueStateDescriptor{" +
- "name=" + name +
- ", defaultValue=" + defaultValue +
- ", serializer=" + serializer +
- '}';
- }
-
- @Override
public Type getType() {
return Type.VALUE;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
index b934ee0..cb6f608 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
@@ -62,6 +63,33 @@ public class ListStateDescriptorTest {
assertEquals(serializer, copy.getElementSerializer());
}
+ @Test
+ public void testHashCodeEquals() throws Exception {
+ final String name = "testName";
+
+ ListStateDescriptor<String> original = new ListStateDescriptor<>(name, String.class);
+ ListStateDescriptor<String> same = new ListStateDescriptor<>(name, String.class);
+ ListStateDescriptor<String> sameBySerializer = new ListStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+ // test that hashCode() works on state descriptors with initialized and uninitialized serializers
+ assertEquals(original.hashCode(), same.hashCode());
+ assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+ assertEquals(original, same);
+ assertEquals(original, sameBySerializer);
+
+ // equality with a clone
+ ListStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+ assertEquals(original, clone);
+
+ // equality with an initialized
+ clone.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, clone);
+
+ original.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, same);
+ }
+
/**
* FLINK-6775.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
index 4e64c0f..069d6c2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
@@ -67,6 +68,34 @@ public class MapStateDescriptorTest {
assertEquals(valueSerializer, copy.getValueSerializer());
}
+ @Test
+ public void testHashCodeEquals() throws Exception {
+ final String name = "testName";
+
+ MapStateDescriptor<String, String> original = new MapStateDescriptor<>(name, String.class, String.class);
+ MapStateDescriptor<String, String> same = new MapStateDescriptor<>(name, String.class, String.class);
+ MapStateDescriptor<String, String> sameBySerializer =
+ new MapStateDescriptor<>(name, StringSerializer.INSTANCE, StringSerializer.INSTANCE);
+
+ // test that hashCode() works on state descriptors with initialized and uninitialized serializers
+ assertEquals(original.hashCode(), same.hashCode());
+ assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+ assertEquals(original, same);
+ assertEquals(original, sameBySerializer);
+
+ // equality with a clone
+ MapStateDescriptor<String, String> clone = CommonTestUtils.createCopySerializable(original);
+ assertEquals(original, clone);
+
+ // equality with an initialized
+ clone.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, clone);
+
+ original.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, same);
+ }
+
/**
* FLINK-6775.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
index 5d9eba5..89aa1e6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
@@ -56,4 +57,32 @@ public class ReducingStateDescriptorTest extends TestLogger {
assertNotNull(copy.getSerializer());
assertEquals(serializer, copy.getSerializer());
}
+
+ @Test
+ public void testHashCodeEquals() throws Exception {
+ final String name = "testName";
+ final ReduceFunction<String> reducer = (a, b) -> a;
+
+ ReducingStateDescriptor<String> original = new ReducingStateDescriptor<>(name, reducer, String.class);
+ ReducingStateDescriptor<String> same = new ReducingStateDescriptor<>(name, reducer, String.class);
+ ReducingStateDescriptor<String> sameBySerializer = new ReducingStateDescriptor<>(name, reducer, StringSerializer.INSTANCE);
+
+ // test that hashCode() works on state descriptors with initialized and uninitialized serializers
+ assertEquals(original.hashCode(), same.hashCode());
+ assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+ assertEquals(original, same);
+ assertEquals(original, sameBySerializer);
+
+ // equality with a clone
+ ReducingStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+ assertEquals(original, clone);
+
+ // equality with an initialized
+ clone.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, clone);
+
+ original.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, same);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
index cf5327e..3958baa 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
@@ -32,6 +32,7 @@ import java.io.File;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
@@ -159,6 +160,47 @@ public class StateDescriptorTest {
}
// ------------------------------------------------------------------------
+ // Test hashCode() and equals()
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testHashCodeAndEquals() throws Exception {
+ final String name = "testName";
+
+ TestStateDescriptor<String> original = new TestStateDescriptor<>(name, String.class);
+ TestStateDescriptor<String> same = new TestStateDescriptor<>(name, String.class);
+ TestStateDescriptor<String> sameBySerializer = new TestStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+ // test that hashCode() works on state descriptors with initialized and uninitialized serializers
+ assertEquals(original.hashCode(), same.hashCode());
+ assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+ assertEquals(original, same);
+ assertEquals(original, sameBySerializer);
+
+ // equality with a clone
+ TestStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+ assertEquals(original, clone);
+
+ // equality with an initialized
+ clone.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, clone);
+
+ original.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, same);
+ }
+
+ @Test
+ public void testEqualsSameNameAndTypeDifferentClass() throws Exception {
+ final String name = "test name";
+
+ final TestStateDescriptor<String> descr1 = new TestStateDescriptor<>(name, String.class);
+ final OtherTestStateDescriptor<String> descr2 = new OtherTestStateDescriptor<>(name, String.class);
+
+ assertNotEquals(descr1, descr2);
+ }
+
+ // ------------------------------------------------------------------------
// Mock implementations and test types
// ------------------------------------------------------------------------
@@ -185,17 +227,34 @@ public class StateDescriptorTest {
@Override
public Type getType() {
- throw new UnsupportedOperationException();
+ return Type.VALUE;
+ }
+ }
+
+ private static class OtherTestStateDescriptor<T> extends StateDescriptor<State, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ OtherTestStateDescriptor(String name, TypeSerializer<T> serializer) {
+ super(name, serializer, null);
+ }
+
+ OtherTestStateDescriptor(String name, TypeInformation<T> typeInfo) {
+ super(name, typeInfo, null);
+ }
+
+ OtherTestStateDescriptor(String name, Class<T> type) {
+ super(name, type, null);
}
@Override
- public int hashCode() {
- return 584523;
+ public State bind(StateBinder stateBinder) throws Exception {
+ throw new UnsupportedOperationException();
}
@Override
- public boolean equals(Object o) {
- return o != null && o.getClass() == TestStateDescriptor.class;
+ public Type getType() {
+ return Type.VALUE;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/69e5d146/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
index 67114e5..3870da0 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDescriptorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.state;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.testutils.CommonTestUtils;
@@ -36,6 +37,33 @@ import static org.junit.Assert.assertNotNull;
public class ValueStateDescriptorTest extends TestLogger {
@Test
+ public void testHashCodeEquals() throws Exception {
+ final String name = "testName";
+
+ ValueStateDescriptor<String> original = new ValueStateDescriptor<>(name, String.class);
+ ValueStateDescriptor<String> same = new ValueStateDescriptor<>(name, String.class);
+ ValueStateDescriptor<String> sameBySerializer = new ValueStateDescriptor<>(name, StringSerializer.INSTANCE);
+
+ // test that hashCode() works on state descriptors with initialized and uninitialized serializers
+ assertEquals(original.hashCode(), same.hashCode());
+ assertEquals(original.hashCode(), sameBySerializer.hashCode());
+
+ assertEquals(original, same);
+ assertEquals(original, sameBySerializer);
+
+ // equality with a clone
+ ValueStateDescriptor<String> clone = CommonTestUtils.createCopySerializable(original);
+ assertEquals(original, clone);
+
+ // equality with an initialized
+ clone.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, clone);
+
+ original.initializeSerializerUnlessSet(new ExecutionConfig());
+ assertEquals(original, same);
+ }
+
+ @Test
public void testVeryLargeDefaultValue() throws Exception {
// ensure that we correctly read very large data when deserializing the default value