You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 13:24:07 UTC
[4/9] flink git commit: [FLINK-2637] [api-breaking] [types] Adds
equals and hashCode method to TypeInformations and TypeSerializers - Fixes
ObjectArrayTypeInfo - Makes CompositeTypes serializable - Adds test for
equality relation's symmetric property - A
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java
new file mode 100644
index 0000000..b200566
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EnumTypeInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class EnumTypeInfoTest extends TestLogger {
+
+ enum TestEnum {
+ ONE, TWO
+ }
+
+ enum AlternativeEnum {
+ ONE, TWO
+ }
+
+ @Test
+ public void testEnumTypeEquality() {
+ EnumTypeInfo<TestEnum> enumTypeInfo1 = new EnumTypeInfo<TestEnum>(TestEnum.class);
+ EnumTypeInfo<TestEnum> enumTypeInfo2 = new EnumTypeInfo<TestEnum>(TestEnum.class);
+
+ assertEquals(enumTypeInfo1, enumTypeInfo2);
+ assertEquals(enumTypeInfo1.hashCode(), enumTypeInfo2.hashCode());
+ }
+
+ @Test
+ public void testEnumTypeInequality() {
+ EnumTypeInfo<TestEnum> enumTypeInfo1 = new EnumTypeInfo<TestEnum>(TestEnum.class);
+ EnumTypeInfo<AlternativeEnum> enumTypeInfo2 = new EnumTypeInfo<AlternativeEnum>(AlternativeEnum.class);
+
+ assertNotEquals(enumTypeInfo1, enumTypeInfo2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java
new file mode 100644
index 0000000..fad43df
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/GenericTypeInfoTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class GenericTypeInfoTest extends TestLogger {
+
+ static class TestClass {}
+ static class AlternativeClass {}
+
+ @Test
+ public void testGenericTypeInfoEquality() {
+ GenericTypeInfo<TestClass> tpeInfo1 = new GenericTypeInfo<>(TestClass.class);
+ GenericTypeInfo<TestClass> tpeInfo2 = new GenericTypeInfo<>(TestClass.class);
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testGenericTypeInfoInequality() {
+ GenericTypeInfo<TestClass> tpeInfo1 = new GenericTypeInfo<>(TestClass.class);
+ GenericTypeInfo<AlternativeClass> tpeInfo2 = new GenericTypeInfo<>(AlternativeClass.class);
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
new file mode 100644
index 0000000..ee57475
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MissingTypeInfoTest extends TestLogger {
+ static final String functionName = "foobar";
+ static final InvalidTypesException testException = new InvalidTypesException("Test exception.");
+
+ @Test
+ public void testMissingTypeInfoEquality() {
+ MissingTypeInfo tpeInfo1 = new MissingTypeInfo(functionName, testException);
+ MissingTypeInfo tpeInfo2 = new MissingTypeInfo(functionName, testException);
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testMissingTypeInfoInequality() {
+ MissingTypeInfo tpeInfo1 = new MissingTypeInfo(functionName, testException);
+ MissingTypeInfo tpeInfo2 = new MissingTypeInfo("alt" + functionName, testException);
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java
new file mode 100644
index 0000000..f3b39c0
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class ObjectArrayTypeInfoTest extends TestLogger {
+
+ public static class TestClass{}
+
+ @Test
+ public void testObjectArrayTypeInfoEquality() {
+ ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo1 = ObjectArrayTypeInfo.getInfoFor(
+ TestClass[].class,
+ new GenericTypeInfo<TestClass>(TestClass.class));
+
+ ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo2 = ObjectArrayTypeInfo.getInfoFor(
+ TestClass[].class,
+ new GenericTypeInfo<TestClass>(TestClass.class));
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testObjectArrayTypeInfoInequality() {
+ ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo1 = ObjectArrayTypeInfo.getInfoFor(
+ TestClass[].class,
+ new GenericTypeInfo<TestClass>(TestClass.class));
+
+ ObjectArrayTypeInfo<TestClass[], TestClass> tpeInfo2 = ObjectArrayTypeInfo.getInfoFor(
+ TestClass[].class,
+ new PojoTypeInfo<TestClass>(TestClass.class, new ArrayList<PojoField>()));
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
index 12b7913..2fe1357 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeInfoTest.java
@@ -21,12 +21,15 @@ package org.apache.flink.api.java.typeutils;
import static org.junit.Assert.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;
+import java.io.IOException;
+
public class PojoTypeInfoTest {
@Test
- public void testEquals() {
+ public void testPojoTypeInfoEquality() {
try {
TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class);
TypeInformation<TestPojo> info2 = TypeExtractor.getForClass(TestPojo.class);
@@ -42,6 +45,35 @@ public class PojoTypeInfoTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testPojoTypeInfoInequality() {
+ try {
+ TypeInformation<TestPojo> info1 = TypeExtractor.getForClass(TestPojo.class);
+ TypeInformation<AlternatePojo> info2 = TypeExtractor.getForClass(AlternatePojo.class);
+
+ assertTrue(info1 instanceof PojoTypeInfo);
+ assertTrue(info2 instanceof PojoTypeInfo);
+
+ assertFalse(info1.equals(info2));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSerializabilityOfPojoTypeInfo() throws IOException, ClassNotFoundException {
+ PojoTypeInfo<TestPojo> pojoTypeInfo = (PojoTypeInfo<TestPojo>)TypeExtractor.getForClass(TestPojo.class);
+
+ byte[] serializedPojoTypeInfo = InstantiationUtil.serializeObject(pojoTypeInfo);
+ PojoTypeInfo<TestPojo> deserializedPojoTypeInfo = (PojoTypeInfo<TestPojo>)InstantiationUtil.deserializeObject(
+ serializedPojoTypeInfo,
+ getClass().getClassLoader());
+
+ assertEquals(pojoTypeInfo, deserializedPojoTypeInfo);
+ }
public static final class TestPojo {
@@ -60,4 +92,22 @@ public class PojoTypeInfoTest {
return aString;
}
}
+
+ public static final class AlternatePojo {
+
+ public int someInt;
+
+ private String aString;
+
+ public Double[] doubleArray;
+
+
+ public void setaString(String aString) {
+ this.aString = aString;
+ }
+
+ public String getaString() {
+ return aString;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
new file mode 100644
index 0000000..7aeb062
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/RecordTypeInfoTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class RecordTypeInfoTest extends TestLogger {
+
+ @Test
+ public void testRecordTypeInfoEquality() {
+ RecordTypeInfo tpeInfo1 = new RecordTypeInfo();
+ RecordTypeInfo tpeInfo2 = new RecordTypeInfo();
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testRecordTypeInfoInequality() {
+ RecordTypeInfo tpeInfo1 = new RecordTypeInfo();
+ MissingTypeInfo tpeInfo2 = new MissingTypeInfo("foobar");
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
new file mode 100644
index 0000000..b6cff34
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TupleTypeInfoTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TupleTypeInfoTest extends TestLogger {
+
+ @Test
+ public void testTupleTypeInfoSymmetricEqualityRelation() {
+ TupleTypeInfo<Tuple1<Integer>> tupleTypeInfo = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO);
+
+ TupleTypeInfoBase<Tuple1> anonymousTupleTypeInfo = new TupleTypeInfoBase<Tuple1>(
+ (Class<Tuple1>)Tuple1.class,
+ (TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO) {
+
+ private static final long serialVersionUID = -7985593598027660836L;
+
+ @Override
+ public TypeSerializer<Tuple1> createSerializer(ExecutionConfig config) {
+ return null;
+ }
+
+ @Override
+ protected TypeComparatorBuilder<Tuple1> createTypeComparatorBuilder() {
+ return null;
+ }
+
+ @Override
+ public String[] getFieldNames() {
+ return new String[0];
+ }
+
+ @Override
+ public int getFieldIndex(String fieldName) {
+ return 0;
+ }
+ };
+
+ boolean tupleVsAnonymous = tupleTypeInfo.equals(anonymousTupleTypeInfo);
+ boolean anonymousVsTuple = anonymousTupleTypeInfo.equals(tupleTypeInfo);
+
+ Assert.assertTrue("Equality relation should be symmetric", tupleVsAnonymous == anonymousVsTuple);
+ }
+
+ @Test
+ public void testTupleTypeInfoEquality() {
+ TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo2 = new TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ Assert.assertEquals(tupleTypeInfo1, tupleTypeInfo2);
+ Assert.assertEquals(tupleTypeInfo1.hashCode(), tupleTypeInfo2.hashCode());
+ }
+
+ @Test
+ public void testTupleTypeInfoInequality() {
+ TupleTypeInfo<Tuple2<Integer, String>> tupleTypeInfo1 = new TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ TupleTypeInfo<Tuple2<Integer, Boolean>> tupleTypeInfo2 = new TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.BOOLEAN_TYPE_INFO);
+
+ Assert.assertNotEquals(tupleTypeInfo1, tupleTypeInfo2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
index 9fe8174..e225460 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
@@ -181,14 +181,14 @@ public class TypeInfoParserTest {
+ ">");
Assert.assertTrue(ti instanceof PojoTypeInfo);
PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
- Assert.assertEquals("array", pti.getPojoFieldAt(0).field.getName());
- Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicArrayTypeInfo);
- Assert.assertEquals("basic", pti.getPojoFieldAt(1).field.getName());
- Assert.assertTrue(pti.getPojoFieldAt(1).type instanceof BasicTypeInfo);
- Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).field.getName());
- Assert.assertTrue(pti.getPojoFieldAt(2).type instanceof WritableTypeInfo);
- Assert.assertEquals("tuple", pti.getPojoFieldAt(3).field.getName());
- Assert.assertTrue(pti.getPojoFieldAt(3).type instanceof TupleTypeInfo);
+ Assert.assertEquals("array", pti.getPojoFieldAt(0).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicArrayTypeInfo);
+ Assert.assertEquals("basic", pti.getPojoFieldAt(1).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(1).getTypeInformation() instanceof BasicTypeInfo);
+ Assert.assertEquals("hadoopCitizen", pti.getPojoFieldAt(2).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(2).getTypeInformation() instanceof WritableTypeInfo);
+ Assert.assertEquals("tuple", pti.getPojoFieldAt(3).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(3).getTypeInformation() instanceof TupleTypeInfo);
}
@Test
@@ -198,12 +198,12 @@ public class TypeInfoParserTest {
TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
Assert.assertTrue(tti.getTypeAt(0) instanceof BasicTypeInfo);
Assert.assertTrue(tti.getTypeAt(1) instanceof TupleTypeInfo);
- TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>) tti.getTypeAt(1);
+ TupleTypeInfo<?> tti2 = (TupleTypeInfo<?>)(Object)tti.getTypeAt(1);
Assert.assertTrue(tti2.getTypeAt(0) instanceof BasicTypeInfo);
Assert.assertTrue(tti2.getTypeAt(1) instanceof PojoTypeInfo);
PojoTypeInfo<?> pti = (PojoTypeInfo<?>) tti2.getTypeAt(1);
- Assert.assertEquals("basic", pti.getPojoFieldAt(0).field.getName());
- Assert.assertTrue(pti.getPojoFieldAt(0).type instanceof BasicTypeInfo);
+ Assert.assertEquals("basic", pti.getPojoFieldAt(0).getField().getName());
+ Assert.assertTrue(pti.getPojoFieldAt(0).getTypeInformation() instanceof BasicTypeInfo);
}
public static class MyWritable implements Writable {
@@ -232,7 +232,7 @@ public class TypeInfoParserTest {
TypeInformation<?> ti = TypeInfoParser.parse("java.lang.Class[]");
Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
- Assert.assertEquals(Class.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentType());
+ Assert.assertEquals(Class.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
TypeInformation<?> ti2 = TypeInfoParser.parse("Tuple2<Integer,Double>[]");
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
new file mode 100644
index 0000000..4a579c8
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/ValueTypeInfoTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.Record;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class ValueTypeInfoTest extends TestLogger {
+
+ public static class TestClass implements Value {
+ private static final long serialVersionUID = -492760806806568285L;
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+
+ }
+ }
+
+ public static class AlternativeClass implements Value {
+
+ private static final long serialVersionUID = -163437084575260172L;
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+
+ }
+ }
+
+ @Test
+ public void testValueTypeInfoEquality() {
+ ValueTypeInfo<TestClass> tpeInfo1 = new ValueTypeInfo<>(TestClass.class);
+ ValueTypeInfo<TestClass> tpeInfo2 = new ValueTypeInfo<>(TestClass.class);
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testValueTyepInfoInequality() {
+ ValueTypeInfo<TestClass> tpeInfo1 = new ValueTypeInfo<>(TestClass.class);
+ ValueTypeInfo<AlternativeClass> tpeInfo2 = new ValueTypeInfo<>(AlternativeClass.class);
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+
+ @Test
+ public void testValueTypeEqualsWithNull() throws Exception {
+ ValueTypeInfo<Record> tpeInfo = new ValueTypeInfo<>(Record.class);
+
+ Assert.assertFalse(tpeInfo.equals(null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
new file mode 100644
index 0000000..2ab0021
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/WritableTypeInfoTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+public class WritableTypeInfoTest extends TestLogger {
+
+ public static class TestClass implements Writable {
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+
+ }
+ }
+
+ public static class AlternateClass implements Writable {
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+
+ }
+ }
+
+
+ @Test
+ public void testWritableTypeInfoEquality() {
+ WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+ WritableTypeInfo<TestClass> tpeInfo2 = new WritableTypeInfo<>(TestClass.class);
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testWritableTypeInfoInequality() {
+ WritableTypeInfo<TestClass> tpeInfo1 = new WritableTypeInfo<>(TestClass.class);
+ WritableTypeInfo<AlternateClass> tpeInfo2 = new WritableTypeInfo<>(AlternateClass.class);
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 69dfeb9..e872526 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -106,4 +106,25 @@ public class IntListSerializer extends TypeSerializer<IntList> {
target.writeInt(source.readInt());
}
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof IntListSerializer) {
+ IntListSerializer other = (IntListSerializer) obj;
+
+ return other.canEqual(this);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof IntListSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return IntListSerializer.class.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index c2571cc..e4a9264 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -88,6 +88,27 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
target.write(source, 8);
}
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof IntPairSerializer) {
+ IntPairSerializer other = (IntPairSerializer) obj;
+
+ return other.canEqual(this);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof IntPairSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return IntPairSerializer.class.hashCode();
+ }
+
public static final class IntPairSerializerFactory implements TypeSerializerFactory<IntPair> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index 388e8bd..b62b097 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -83,4 +83,25 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
StringValue.writeString(StringValue.readString(source), target);
StringValue.writeString(StringValue.readString(source), target);
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StringPairSerializer) {
+ StringPairSerializer other = (StringPairSerializer) obj;
+
+ return other.canEqual(this);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof StringPairSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return StringPairSerializer.class.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 344b186..5265134 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -296,7 +296,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
"Field \"" + pojoFields(i) + "\" not part of POJO type " +
info.getTypeClass.getCanonicalName);
}
- classesBuf += info.getPojoFieldAt(pos).`type`.getTypeClass
+ classesBuf += info.getPojoFieldAt(pos).getTypeInformation().getTypeClass
}
}
case _ => throw new IllegalArgumentException("Type information is not valid.")
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
index 07f7205..aa76fcc 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala
@@ -121,7 +121,7 @@ private[flink] trait TypeInformationGen[C <: Context] {
fieldSerializers(i) = types(i).createSerializer(executionConfig)
}
- new CaseClassSerializer[T](tupleType, fieldSerializers) {
+ new CaseClassSerializer[T](getTypeClass(), fieldSerializers) {
override def createInstance(fields: Array[AnyRef]): T = {
instance.splice
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index 2a76c37..3a015f7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -17,7 +17,6 @@
*/
package org.apache.flink.api.scala.typeutils
-import org.apache.commons.lang.SerializationUtils
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
import org.apache.flink.core.memory.{DataOutputView, DataInputView}
@@ -29,19 +28,25 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView}
abstract class CaseClassSerializer[T <: Product](
clazz: Class[T],
scalaFieldSerializers: Array[TypeSerializer[_]])
- extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable {
+ extends TupleSerializerBase[T](clazz, scalaFieldSerializers)
+ with Cloneable {
@transient var fields : Array[AnyRef] = _
@transient var instanceCreationFailed : Boolean = false
override def duplicate = {
- val result = this.clone().asInstanceOf[CaseClassSerializer[T]]
+ clone().asInstanceOf[CaseClassSerializer[T]]
+ }
+
+ @throws[CloneNotSupportedException]
+ override protected def clone(): Object = {
+ val result = super.clone().asInstanceOf[CaseClassSerializer[T]]
- // set transient fields to null and make copy of serializers
+ // achieve a deep copy by duplicating the field serializers
+ result.fieldSerializers.transform(_.duplicate())
result.fields = null
result.instanceCreationFailed = false
- result.fieldSerializers = fieldSerializers.map(_.duplicate())
result
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index 0c8049d..37c7431 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -18,19 +18,19 @@
package org.apache.flink.api.scala.typeutils
+import java.util
import java.util.regex.{Pattern, Matcher}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeinfo.AtomicType
-import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
+import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
+InvalidFieldReferenceException, FlatFieldDescriptor}
import org.apache.flink.api.common.typeutils._
import org.apache.flink.api.java.operators.Keys.ExpressionKeys
-import org.apache.flink.api.java.typeutils.{TupleTypeInfoBase, PojoTypeInfo}
-import PojoTypeInfo.NamedFlatFieldDescriptor
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
/**
* TypeInformation for Case Classes. Creation and access is different from
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
*/
abstract class CaseClassTypeInfo[T <: Product](
clazz: Class[T],
- typeParamTypeInfos: Array[TypeInformation[_]],
+ val typeParamTypeInfos: Array[TypeInformation[_]],
fieldTypes: Seq[TypeInformation[_]],
val fieldNames: Seq[String])
extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
@@ -63,46 +63,19 @@ abstract class CaseClassTypeInfo[T <: Product](
fields map { x => fieldNames.indexOf(x) }
}
- /*
- * Comparator construction
- */
- var fieldComparators: Array[TypeComparator[_]] = null
- var logicalKeyFields : Array[Int] = null
- var comparatorHelperIndex = 0
-
- override protected def initializeNewComparator(localKeyCount: Int): Unit = {
- fieldComparators = new Array(localKeyCount)
- logicalKeyFields = new Array(localKeyCount)
- comparatorHelperIndex = 0
- }
-
- override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit = {
- fieldComparators(comparatorHelperIndex) = comparator
- logicalKeyFields(comparatorHelperIndex) = fieldId
- comparatorHelperIndex += 1
- }
-
- override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] = {
- val finalLogicalKeyFields = logicalKeyFields.take(comparatorHelperIndex)
- val finalComparators = fieldComparators.take(comparatorHelperIndex)
- val maxKey = finalLogicalKeyFields.max
-
- // create serializers only up to the last key, fields after that are not needed
- val fieldSerializers = types.take(maxKey + 1).map(_.createSerializer(executionConfig))
- new CaseClassComparator[T](finalLogicalKeyFields, finalComparators, fieldSerializers.toArray)
- }
-
override def getFlatFields(
fieldExpression: String,
offset: Int,
result: java.util.List[FlatFieldDescriptor]): Unit = {
val matcher: Matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression)
+
if (!matcher.matches) {
throw new InvalidFieldReferenceException("Invalid tuple field reference \"" +
fieldExpression + "\".")
}
var field: String = matcher.group(0)
+
if ((field == ExpressionKeys.SELECT_ALL_CHAR) ||
(field == ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
var keyPosition: Int = 0
@@ -116,59 +89,58 @@ abstract class CaseClassTypeInfo[T <: Product](
}
keyPosition += 1
}
- return
} else {
field = matcher.group(1)
- }
-
- val intFieldMatcher = PATTERN_INT_FIELD.matcher(field)
- if(intFieldMatcher.matches()) {
- // convert 0-indexed integer field into 1-indexed name field
- field = "_" + (Integer.valueOf(field) + 1)
- }
- var pos = offset
- val tail = matcher.group(3)
- if (tail == null) {
+ val intFieldMatcher = PATTERN_INT_FIELD.matcher(field)
+ if(intFieldMatcher.matches()) {
+ // convert 0-indexed integer field into 1-indexed name field
+ field = "_" + (Integer.valueOf(field) + 1)
+ }
- for (i <- 0 until fieldNames.length) {
- if (field == fieldNames(i)) {
- // found field
- fieldTypes(i) match {
- case ct: CompositeType[_] =>
- ct.getFlatFields("*", pos, result)
- return
- case _ =>
- result.add(new FlatFieldDescriptor(pos, fieldTypes(i)))
- return
+ val tail = matcher.group(3)
+
+ if (tail == null) {
+ def extractFlatFields(index: Int, pos: Int): Unit = {
+ if (index >= fieldNames.size) {
+ throw new InvalidFieldReferenceException("Unable to find field \"" + field +
+ "\" in type " + this + ".")
+ } else if (field == fieldNames(index)) {
+ // found field
+ fieldTypes(index) match {
+ case ct: CompositeType[_] =>
+ ct.getFlatFields("*", pos, result)
+ case _ =>
+ result.add(new FlatFieldDescriptor(pos, fieldTypes(index)))
+ }
+ } else {
+ // skipping over non-matching fields
+ extractFlatFields(index + 1, pos + fieldTypes(index).getTotalFields())
}
- } else {
- // skipping over non-matching fields
- pos += fieldTypes(i).getTotalFields
}
- }
- throw new InvalidFieldReferenceException("Unable to find field \"" + field +
- "\" in type " + this + ".")
- } else {
- var pos = offset
- for (i <- 0 until fieldNames.length) {
- if (field == fieldNames(i)) {
- // found field
- fieldTypes(i) match {
- case ct: CompositeType[_] =>
- ct.getFlatFields(tail, pos, result)
- return
- case _ =>
- throw new InvalidFieldReferenceException("Nested field expression \"" + tail +
- "\" not possible on atomic type " + fieldTypes(i) + ".")
+
+ extractFlatFields(0, offset)
+ } else {
+ def extractFlatFields(index: Int, pos: Int): Unit = {
+ if (index >= fieldNames.size) {
+ throw new InvalidFieldReferenceException("Unable to find field \"" + field +
+ "\" in type " + this + ".")
+ } else if (field == fieldNames(index)) {
+ // found field
+ fieldTypes(index) match {
+ case ct: CompositeType[_] =>
+ ct.getFlatFields(tail, pos, result)
+ case _ =>
+ throw new InvalidFieldReferenceException("Nested field expression \"" + tail +
+ "\" not possible on atomic type " + fieldTypes(index) + ".")
+ }
+ } else {
+ extractFlatFields(index + 1, pos + fieldTypes(index).getTotalFields())
}
- } else {
- // skipping over non-matching fields
- pos += fieldTypes(i).getTotalFields
}
+
+ extractFlatFields(0, offset)
}
- throw new InvalidFieldReferenceException("Unable to find field \"" + field +
- "\" in type " + this + ".")
}
}
@@ -195,7 +167,7 @@ abstract class CaseClassTypeInfo[T <: Product](
field = "_" + (Integer.valueOf(field) + 1)
}
- for (i <- 0 until fieldNames.length) {
+ for (i <- fieldNames.indices) {
if (fieldNames(i) == field) {
if (tail == null) {
return getTypeAt(i)
@@ -225,9 +197,57 @@ abstract class CaseClassTypeInfo[T <: Product](
}
}
- override def toString = clazz.getName + "(" + fieldNames.zip(types).map {
- case (n, t) => n + ": " + t}
- .mkString(", ") + ")"
+ override def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = {
+ new CaseClassTypeComparatorBuilder
+ }
+
+ private class CaseClassTypeComparatorBuilder extends TypeComparatorBuilder[T] {
+ val fieldComparators: ArrayBuffer[TypeComparator[_]] = new ArrayBuffer[TypeComparator[_]]()
+ val logicalKeyFields: ArrayBuffer[Int] = new ArrayBuffer[Int]()
+
+ override def initializeTypeComparatorBuilder(size: Int): Unit = {}
+
+ override def addComparatorField(fieldId: Int, comparator: TypeComparator[_]): Unit = {
+ fieldComparators += comparator
+ logicalKeyFields += fieldId
+ }
+
+ override def createTypeComparator(config: ExecutionConfig): TypeComparator[T] = {
+ val maxIndex = logicalKeyFields.max
+
+ new CaseClassComparator[T](
+ logicalKeyFields.toArray,
+ fieldComparators.toArray,
+ types.take(maxIndex + 1).map(_.createSerializer(config))
+ )
+ }
+ }
+
+ override def toString: String = {
+ clazz.getName + "(" + fieldNames.zip(types).map {
+ case (n, t) => n + ": " + t
+ }.mkString(", ") + ")"
+ }
override def isCaseClass = true
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case caseClass: CaseClassTypeInfo[_] =>
+ caseClass.canEqual(this) &&
+ super.equals(caseClass) &&
+ typeParamTypeInfos.sameElements(caseClass.typeParamTypeInfos) &&
+ fieldNames.equals(caseClass.fieldNames)
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = {
+ 31 * (31 * super.hashCode() + fieldNames.hashCode()) +
+ util.Arrays.hashCode(typeParamTypeInfos.asInstanceOf[Array[AnyRef]])
+ }
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[CaseClassTypeInfo[_]]
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
index 65628a0..2efc207 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala
@@ -86,11 +86,20 @@ class EitherSerializer[A, B, T <: Either[A, B]](
}
override def equals(obj: Any): Boolean = {
- if (obj != null && obj.isInstanceOf[EitherSerializer[_, _, _]]) {
- val other = obj.asInstanceOf[EitherSerializer[_, _, _]]
- other.leftSerializer.equals(leftSerializer) && other.rightSerializer.equals(rightSerializer)
- } else {
- false
+ obj match {
+ case eitherSerializer: EitherSerializer[_, _, _] =>
+ eitherSerializer.canEqual(this) &&
+ leftSerializer.equals(eitherSerializer.leftSerializer) &&
+ rightSerializer.equals(eitherSerializer.rightSerializer)
+ case _ => false
}
}
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[EitherSerializer[_, _, _]]
+ }
+
+ override def hashCode(): Int = {
+ 31 * leftSerializer.hashCode() + rightSerializer.hashCode()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
index a1cded7..2beebde 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfo.scala
@@ -27,9 +27,9 @@ import scala.collection.JavaConverters._
* TypeInformation [[Either]].
*/
class EitherTypeInfo[A, B, T <: Either[A, B]](
- clazz: Class[T],
- leftTypeInfo: TypeInformation[A],
- rightTypeInfo: TypeInformation[B])
+ val clazz: Class[T],
+ val leftTypeInfo: TypeInformation[A],
+ val rightTypeInfo: TypeInformation[B])
extends TypeInformation[T] {
override def isBasicType: Boolean = false
@@ -55,5 +55,24 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
new EitherSerializer(leftSerializer, rightSerializer)
}
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case eitherTypeInfo: EitherTypeInfo[_, _, _] =>
+ eitherTypeInfo.canEqual(this) &&
+ clazz.equals(eitherTypeInfo.clazz) &&
+ leftTypeInfo.equals(eitherTypeInfo.leftTypeInfo) &&
+ rightTypeInfo.equals(eitherTypeInfo.rightTypeInfo)
+ case _ => false
+ }
+ }
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[EitherTypeInfo[_, _, _]]
+ }
+
+ override def hashCode(): Int = {
+ 31 * (31 * clazz.hashCode() + leftTypeInfo.hashCode()) + rightTypeInfo.hashCode()
+ }
+
override def toString = s"Either[$leftTypeInfo, $rightTypeInfo]"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 8d03676..7c3bc95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -51,15 +51,18 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
override def deserialize(reuse: T, source: DataInputView): T = deserialize(source)
override def equals(obj: Any): Boolean = {
- if (obj != null && obj.isInstanceOf[EnumValueSerializer[_]]) {
- val other = obj.asInstanceOf[EnumValueSerializer[_]]
- this.enum == other.enum
- } else {
- false
+ obj match {
+ case enumValueSerializer: EnumValueSerializer[_] =>
+ enumValueSerializer.canEqual(this) && enum == enumValueSerializer.enum
+ case _ => false
}
}
override def hashCode(): Int = {
enum.hashCode()
}
+
+ override def canEqual(obj: scala.Any): Boolean = {
+ obj.isInstanceOf[EnumValueSerializer[_]]
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
index c66e4bc..e3d665e 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfo.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
/**
* TypeInformation for [[Enumeration]] values.
*/
-class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])
+class EnumValueTypeInfo[E <: Enumeration](val enum: E, val clazz: Class[E#Value])
extends TypeInformation[E#Value] with AtomicType[E#Value] {
type T = E#Value
@@ -49,4 +49,22 @@ class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value])
}
override def toString = clazz.getCanonicalName
+
+ override def hashCode(): Int = {
+ 31 * enum.hashCode() + clazz.hashCode()
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case enumValueTypeInfo: EnumValueTypeInfo[E] =>
+ enumValueTypeInfo.canEqual(this) &&
+ enum.equals(enumValueTypeInfo.enum) &&
+ clazz.equals(enumValueTypeInfo.clazz)
+ case _ => false
+ }
+ }
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[EnumValueTypeInfo[E]]
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
index 147a060..a6a7954 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala
@@ -56,6 +56,17 @@ class NothingSerializer extends TypeSerializer[Any] {
throw new RuntimeException("This must not be used. You encountered a bug.")
override def equals(obj: Any): Boolean = {
- obj != null && obj.isInstanceOf[NothingSerializer]
+ obj match {
+ case nothingSerializer: NothingSerializer => nothingSerializer.canEqual(this)
+ case _ => false
+ }
+ }
+
+ override def canEqual(obj: scala.Any): Boolean = {
+ obj.isInstanceOf[NothingSerializer]
+ }
+
+ override def hashCode(): Int = {
+ classOf[NothingSerializer].hashCode()
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 488710d..af2091b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -71,11 +71,18 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
override def deserialize(reuse: Option[A], source: DataInputView): Option[A] = deserialize(source)
override def equals(obj: Any): Boolean = {
- if (obj != null && obj.isInstanceOf[OptionSerializer[_]]) {
- val other = obj.asInstanceOf[OptionSerializer[_]]
- other.elemSerializer.equals(elemSerializer)
- } else {
- false
+ obj match {
+ case optionSerializer: OptionSerializer[_] =>
+ optionSerializer.canEqual(this) && elemSerializer.equals(optionSerializer.elemSerializer)
+ case _ => false
}
}
+
+ override def canEqual(obj: scala.Any): Boolean = {
+ obj.isInstanceOf[OptionSerializer[_]]
+ }
+
+ override def hashCode(): Int = {
+ elemSerializer.hashCode()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
index 510b604..2aff2dd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
/**
* TypeInformation for [[Option]].
*/
-class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
+class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformation[A])
extends TypeInformation[T] {
override def isBasicType: Boolean = false
@@ -49,4 +49,20 @@ class OptionTypeInfo[A, T <: Option[A]](elemTypeInfo: TypeInformation[A])
}
override def toString = s"Option[$elemTypeInfo]"
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case optTpe: OptionTypeInfo[_, _] =>
+ optTpe.canEqual(this) && elemTypeInfo.equals(optTpe.elemTypeInfo)
+ case _ => false
+ }
+ }
+
+ def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[OptionTypeInfo[_, _]]
+ }
+
+ override def hashCode: Int = {
+ elemTypeInfo.hashCode()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 38fd14b..d242863 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -134,11 +134,18 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
}
override def equals(obj: Any): Boolean = {
- if (obj != null && obj.isInstanceOf[TraversableSerializer[_, _]]) {
- val other = obj.asInstanceOf[TraversableSerializer[_, _]]
- other.elementSerializer.equals(elementSerializer)
- } else {
- false
+ obj match {
+ case other: TraversableSerializer[_, _] =>
+ other.canEqual(this) && elementSerializer.equals(other.elementSerializer)
+ case _ => false
}
}
+
+ override def hashCode(): Int = {
+ elementSerializer.hashCode()
+ }
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[TraversableSerializer[_, _]]
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 76067bb..8948b0c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -23,13 +23,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
import scala.collection.JavaConverters._
-import scala.collection.generic.CanBuildFrom
-
/**
* TypeInformation for Scala Collections.
*/
abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
- clazz: Class[T],
+ val clazz: Class[T],
val elementTypeInfo: TypeInformation[E])
extends TypeInformation[T] {
@@ -41,17 +39,25 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
override def getTypeClass: Class[T] = clazz
override def getGenericParameters = List[TypeInformation[_]](elementTypeInfo).asJava
-
def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T]
override def equals(other: Any): Boolean = {
- if (other.isInstanceOf[TraversableTypeInfo[_, _]]) {
- val otherTrav = other.asInstanceOf[TraversableTypeInfo[_, _]]
- otherTrav.getTypeClass == getTypeClass && otherTrav.elementTypeInfo == elementTypeInfo
- } else {
- false
+ other match {
+ case traversable: TraversableTypeInfo[_, _] =>
+ traversable.canEqual(this) &&
+ clazz == traversable.clazz &&
+ elementTypeInfo.equals(traversable.elementTypeInfo)
+ case _ => false
}
}
+ override def hashCode(): Int = {
+ 31 * clazz.hashCode() + elementTypeInfo.hashCode()
+ }
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[TraversableTypeInfo[_, _]]
+ }
+
override def toString = s"$clazz[$elementTypeInfo]"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index b7ceadf..99aae21 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -27,7 +27,9 @@ import scala.util.{Success, Try, Failure}
/**
* Serializer for [[scala.util.Try]].
*/
-class TrySerializer[A](val elemSerializer: TypeSerializer[A], executionConfig: ExecutionConfig)
+class TrySerializer[A](
+ private val elemSerializer: TypeSerializer[A],
+ private val executionConfig: ExecutionConfig)
extends TypeSerializer[Try[A]] {
override def duplicate: TrySerializer[A] = this
@@ -80,11 +82,18 @@ class TrySerializer[A](val elemSerializer: TypeSerializer[A], executionConfig: E
override def deserialize(reuse: Try[A], source: DataInputView): Try[A] = deserialize(source)
override def equals(obj: Any): Boolean = {
- if (obj != null && obj.isInstanceOf[TrySerializer[_]]) {
- val other = obj.asInstanceOf[TrySerializer[_]]
- other.elemSerializer.equals(elemSerializer)
- } else {
- false
+ obj match {
+ case other: TrySerializer[_] =>
+ other.canEqual(this) && elemSerializer.equals(other.elemSerializer)
+ case _ => false
}
}
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[TrySerializer[_]]
+ }
+
+ override def hashCode(): Int = {
+ 31 * elemSerializer.hashCode() + executionConfig.hashCode()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
index 3749b37..f3f2ce2 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TryTypeInfo.scala
@@ -28,7 +28,7 @@ import scala.util.Try
/**
* TypeInformation for [[scala.util.Try]].
*/
-class TryTypeInfo[A, T <: Try[A]](elemTypeInfo: TypeInformation[A])
+class TryTypeInfo[A, T <: Try[A]](val elemTypeInfo: TypeInformation[A])
extends TypeInformation[T] {
override def isBasicType: Boolean = false
@@ -49,5 +49,22 @@ class TryTypeInfo[A, T <: Try[A]](elemTypeInfo: TypeInformation[A])
}
}
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case tryTypeInfo: TryTypeInfo[_, _] =>
+ tryTypeInfo.canEqual(this) &&
+ elemTypeInfo.equals(tryTypeInfo.elemTypeInfo)
+ case _ => false
+ }
+ }
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[TryTypeInfo[_, _]]
+ }
+
+ override def hashCode(): Int = {
+ elemTypeInfo.hashCode()
+ }
+
override def toString = s"Try[$elemTypeInfo]"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
new file mode 100644
index 0000000..479483f
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testCaseClassTypeInfoEquality(): Unit = {
+ val tpeInfo1 = new CaseClassTypeInfo[Tuple2[Int, String]](
+ classOf[Tuple2[Int, String]],
+ Array(),
+ Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+ Array("_1", "_2")) {
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, String)] = ???
+ }
+
+ val tpeInfo2 = new CaseClassTypeInfo[Tuple2[Int, String]](
+ classOf[Tuple2[Int, String]],
+ Array(),
+ Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+ Array("_1", "_2")) {
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, String)] = ???
+ }
+
+ assert(tpeInfo1.equals(tpeInfo2))
+ assert(tpeInfo1.hashCode() == tpeInfo2.hashCode())
+ }
+
+ @Test
+ def testCaseClassTypeInfoInequality(): Unit = {
+ val tpeInfo1 = new CaseClassTypeInfo[Tuple2[Int, String]](
+ classOf[Tuple2[Int, String]],
+ Array(),
+ Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
+ Array("_1", "_2")) {
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, String)] = ???
+ }
+
+ val tpeInfo2 = new CaseClassTypeInfo[Tuple2[Int, Boolean]](
+ classOf[Tuple2[Int, Boolean]],
+ Array(),
+ Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO),
+ Array("_1", "_2")) {
+ override def createSerializer(config: ExecutionConfig): TypeSerializer[(Int, Boolean)] = ???
+ }
+
+ assert(!tpeInfo1.equals(tpeInfo2))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala
new file mode 100644
index 0000000..e23a6a0
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EitherTypeInfoTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class EitherTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testEitherTypeEquality(): Unit = {
+ val eitherTypeInfo1 = new EitherTypeInfo[Integer, String, Either[Integer, String]](
+ classOf[Either[Integer,String]],
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO
+ )
+ val eitherTypeInfo2 = new EitherTypeInfo[Integer, String, Either[Integer, String]](
+ classOf[Either[Integer,String]],
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO
+ )
+
+ assert(eitherTypeInfo1.equals(eitherTypeInfo2))
+ assert(eitherTypeInfo1.hashCode() == eitherTypeInfo2.hashCode())
+ }
+
+ @Test
+ def testEitherTypeInequality(): Unit = {
+ val eitherTypeInfo1 = new EitherTypeInfo[Integer, Integer, Either[Integer, Integer]](
+ classOf[Either[Integer,Integer]],
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ )
+ val eitherTypeInfo2 = new EitherTypeInfo[Integer, String, Either[Integer, String]](
+ classOf[Either[Integer,String]],
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO
+ )
+ assert(!eitherTypeInfo1.equals(eitherTypeInfo2))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala
new file mode 100644
index 0000000..acd6a39
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueTypeInfoTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.scala.typeutils.AlternateEnumeration.AlternateEnumeration
+import org.apache.flink.api.scala.typeutils.TestEnumeration.TestEnumeration
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class EnumValueTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testEnumValueTypeInfoEquality(): Unit = {
+ val enumTypeInfo1 = new EnumValueTypeInfo[TestEnumeration.type](
+ TestEnumeration,
+ classOf[TestEnumeration])
+ val enumTypeInfo2 = new EnumValueTypeInfo[TestEnumeration.type](
+ TestEnumeration,
+ classOf[TestEnumeration])
+
+ assert(enumTypeInfo1.equals(enumTypeInfo2))
+ assert(enumTypeInfo1.hashCode() == enumTypeInfo2.hashCode())
+ }
+
+ @Test
+ def testEnumValueTypeInfoInequality(): Unit = {
+ val enumTypeInfo1 = new EnumValueTypeInfo[TestEnumeration.type](
+ TestEnumeration,
+ classOf[TestEnumeration])
+ val enumTypeInfo2 = new EnumValueTypeInfo[AlternateEnumeration.type](
+ AlternateEnumeration,
+ classOf[AlternateEnumeration])
+
+ assert(!enumTypeInfo1.equals(enumTypeInfo2))
+ }
+
+}
+
+object TestEnumeration extends Enumeration {
+ type TestEnumeration = Value
+ val ONE = this.Value
+}
+
+object AlternateEnumeration extends Enumeration {
+ type AlternateEnumeration = Value
+ val TWO = this.Value
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala
new file mode 100644
index 0000000..b765658
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfoTest.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.{JUnitSuiteLike, JUnitSuite}
+
+class OptionTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testOptionTypeEquality: Unit = {
+ val optionTypeInfo1 = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+ val optionTypeInfo2 = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+
+ assert(optionTypeInfo1.equals(optionTypeInfo2))
+ assert(optionTypeInfo1.hashCode == optionTypeInfo2.hashCode)
+ }
+
+ @Test
+ def testOptionTypeInequality: Unit = {
+ val optionTypeInfo1 = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+ val optionTypeInfo2 = new OptionTypeInfo[String, Option[String]](BasicTypeInfo.STRING_TYPE_INFO)
+
+ assert(!optionTypeInfo1.equals(optionTypeInfo2))
+ }
+
+ @Test
+ def testOptionTypeInequalityWithDifferentType: Unit = {
+ val optionTypeInfo = new OptionTypeInfo[Integer, Option[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+ val genericTypeInfo = new GenericTypeInfo[Double](Double.getClass.asInstanceOf[Class[Double]])
+
+ assert(!optionTypeInfo.equals(genericTypeInfo))
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala
new file mode 100644
index 0000000..e83b326
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfoTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{TypeInformation, BasicTypeInfo}
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class TraversableTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testTraversableTypeInfoEquality(): Unit = {
+ val tpeInfo1 = new TraversableTypeInfo[Seq[Int], Int](
+ classOf[Seq[Int]],
+ BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+ override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Seq[Int]] =
+ ???
+ }
+
+ val tpeInfo2 = new TraversableTypeInfo[Seq[Int], Int](
+ classOf[Seq[Int]],
+ BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+ override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Seq[Int]] =
+ ???
+ }
+
+ assert(tpeInfo1.equals(tpeInfo2))
+ }
+
+ @Test
+ def testTraversableTypeInfoInequality(): Unit = {
+ val tpeInfo1 = new TraversableTypeInfo[Seq[Int], Int](
+ classOf[Seq[Int]],
+ BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+ override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Seq[Int]] =
+ ???
+ }
+
+ val tpeInfo2 = new TraversableTypeInfo[List[Int], Int](
+ classOf[List[Int]],
+ BasicTypeInfo.INT_TYPE_INFO.asInstanceOf[TypeInformation[Int]]) {
+ override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[List[Int]] =
+ ???
+ }
+
+ assert(!tpeInfo1.equals(tpeInfo2))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
new file mode 100644
index 0000000..3a5fc80
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+import scala.util.Try
+
+class TryTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testTryTypeEquality: Unit = {
+ val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+ val TryTypeInfo2 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+
+ assert(TryTypeInfo1.equals(TryTypeInfo2))
+ assert(TryTypeInfo1.hashCode == TryTypeInfo2.hashCode)
+ }
+
+ @Test
+ def testTryTypeInequality: Unit = {
+ val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+ val TryTypeInfo2 = new TryTypeInfo[String, Try[String]](BasicTypeInfo.STRING_TYPE_INFO)
+
+ assert(!TryTypeInfo1.equals(TryTypeInfo2))
+ }
+
+ @Test
+ def testTryTypeInequalityWithDifferentType: Unit = {
+ val TryTypeInfo = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
+ val genericTypeInfo = new GenericTypeInfo[Double](Double.getClass.asInstanceOf[Class[Double]])
+
+ assert(!TryTypeInfo.equals(genericTypeInfo))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
index 229cb4a..529850f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing;
import java.io.IOException;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -37,6 +38,8 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
TypeSerializer<Boolean> boolSerializer = BooleanSerializer.INSTANCE;
public StreamWindowSerializer(TypeInformation<T> typeInfo, ExecutionConfig conf) {
+ Preconditions.checkNotNull(typeInfo);
+
this.typeSerializer = typeInfo.createSerializer(conf);
}
@@ -118,6 +121,27 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
}
@Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StreamWindowSerializer) {
+ StreamWindowSerializer<?> other = (StreamWindowSerializer<?>) obj;
+
+ return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof StreamWindowSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return typeSerializer.hashCode();
+ }
+
+ @Override
public TypeSerializer<StreamWindow<T>> duplicate() {
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
index 3bcc253..2c0a999 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.windowing;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,10 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
private static final long serialVersionUID = 1L;
- TypeInformation<T> innerType;
+
+ final TypeInformation<T> innerType;
public StreamWindowTypeInfo(TypeInformation<T> innerType) {
- this.innerType = innerType;
+ this.innerType = Preconditions.checkNotNull(innerType);
}
public TypeInformation<T> getInnerType() {
@@ -50,10 +52,10 @@ public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
return innerType.getArity();
}
+ @SuppressWarnings("unchecked")
@Override
public Class<StreamWindow<T>> getTypeClass() {
- // TODO Auto-generated method stub
- return null;
+ return (Class<StreamWindow<T>>)(Object)StreamWindow.class;
}
@Override
@@ -67,6 +69,34 @@ public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
}
@Override
+ public String toString() {
+ return getClass().getSimpleName() + "<" + innerType + ">";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StreamWindowTypeInfo) {
+ @SuppressWarnings("unchecked")
+ StreamWindowTypeInfo<T> streamWindowTypeInfo = (StreamWindowTypeInfo<T>) obj;
+
+ return streamWindowTypeInfo.canEqual(this) &&
+ innerType.equals(streamWindowTypeInfo.innerType);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return innerType.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof StreamWindowTypeInfo;
+ }
+
+ @Override
public int getTotalFields() {
return innerType.getTotalFields();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 156e5d6..d4363cd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -164,4 +164,25 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
typeSerializer.copy(source, target);
}
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof MultiplexingStreamRecordSerializer) {
+ MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj;
+
+ return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof MultiplexingStreamRecordSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return typeSerializer.hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index e58d3c8..d47da50 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -122,4 +122,25 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
public void copy(DataInputView source, DataOutputView target) throws IOException {
typeSerializer.copy(source, target);
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StreamRecordSerializer) {
+ StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
+
+ return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof StreamRecordSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return typeSerializer.hashCode();
+ }
}