You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 13:24:06 UTC
[3/9] flink git commit: [FLINK-2637] [api-breaking] [types] Adds
equals and hashCode method to TypeInformations and TypeSerializers - Fixes
ObjectArrayTypeInfo - Makes CompositeTypes serializable - Adds test for
equality relation's symmetric property - A
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
new file mode 100644
index 0000000..2e98a8f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class StreamWindowTypeInfoTest extends TestLogger {
+
+ public static class TestClass{}
+
+ @Test
+ public void testStreamWindowTypeInfoEquality() {
+ StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
+ StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
+
+ assertEquals(tpeInfo1, tpeInfo2);
+ assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+ }
+
+ @Test
+ public void testStreamWindowTypeInfoInequality() {
+ StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
+ StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new PojoTypeInfo<>(TestClass.class, new ArrayList<PojoField>()));
+
+ assertNotEquals(tpeInfo1, tpeInfo2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
index 6a9cbfe..dd598ab 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
@@ -21,8 +21,9 @@ import java.util
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
+FlatFieldDescriptor}
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer}
/**
* A TypeInformation that is used to rename fields of an underlying CompositeType. This
@@ -30,8 +31,9 @@ import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, Typ
* that does not get translated to a runtime operator.
*/
class RenamingProxyTypeInfo[T](
- tpe: CompositeType[T],
- fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) {
+ val tpe: CompositeType[T],
+ val fieldNames: Array[String])
+ extends CompositeType[T](tpe.getTypeClass) {
def getUnderlyingType: CompositeType[T] = tpe
@@ -86,16 +88,6 @@ class RenamingProxyTypeInfo[T](
executionConfig: ExecutionConfig) =
tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig)
- // These are never called since we override create comparator
- override protected def initializeNewComparator(localKeyCount: Int): Unit =
- throw new RuntimeException("Cannot happen.")
-
- override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] =
- throw new RuntimeException("Cannot happen.")
-
- override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit =
- throw new RuntimeException("Cannot happen.")
-
override def getFlatFields(
fieldExpression: String,
offset: Int,
@@ -106,4 +98,27 @@ class RenamingProxyTypeInfo[T](
override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
tpe.getTypeAt(fieldExpression)
}
+
+ override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = {
+ throw new RuntimeException("This method should never be called because createComparator is " +
+ "overwritten.")
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj match {
+ case renamingProxy: RenamingProxyTypeInfo[_] =>
+ renamingProxy.canEqual(this) &&
+ tpe.equals(renamingProxy.tpe) &&
+ fieldNames.sameElements(renamingProxy.fieldNames)
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = {
+ 31 * tpe.hashCode() + util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]])
+ }
+
+ override def canEqual(obj: Any): Boolean = {
+ obj.isInstanceOf[RenamingProxyTypeInfo[_]]
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
index 527e2b4..02219c7 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
/**
* Serializer for [[Row]].
*/
-class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
+class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
extends TypeSerializer[Row] {
private def getFieldSerializers = fieldSerializers
@@ -150,9 +150,17 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
override def equals(any: scala.Any): Boolean = {
any match {
case otherRS: RowSerializer =>
- val otherFieldSerializers = otherRS.getFieldSerializers.asInstanceOf[Array[AnyRef]]
- util.Arrays.deepEquals(fieldSerializers.asInstanceOf[Array[AnyRef]], otherFieldSerializers)
+ otherRS.canEqual(this) &&
+ fieldSerializers.sameElements(otherRS.fieldSerializers)
case _ => false
}
}
+
+ override def canEqual(obj: scala.Any): Boolean = {
+ obj.isInstanceOf[RowSerializer]
+ }
+
+ override def hashCode(): Int = {
+ util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
new file mode 100644
index 0000000..9c62a51
--- /dev/null
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PojoGroupingITCase extends MultipleProgramsTestBase {
+
+ public PojoGroupingITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testPojoGrouping() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+ new Tuple3<String, Double, String>("A", 23.0, "Z"),
+ new Tuple3<String, Double, String>("A", 24.0, "Y"),
+ new Tuple3<String, Double, String>("B", 1.0, "Z"));
+
+ TableEnvironment tableEnv = new TableEnvironment();
+
+ Table table = tableEnv
+ .fromDataSet(data, "groupMe, value, name")
+ .select("groupMe, value, name")
+ .where("groupMe != 'B'");
+
+ DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
+
+ DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+ .sortGroup("value", Order.DESCENDING)
+ .first(1);
+ List<MyPojo> resultList = result.collect();
+
+ compareResultAsText(resultList, "A,24.0,Y");
+ }
+
+ public static class MyPojo implements Serializable {
+ private static final long serialVersionUID = 8741918940120107213L;
+
+ public String groupMe;
+ public double value;
+ public String name;
+
+ public MyPojo() {
+ // for serialization
+ }
+
+ public MyPojo(String groupMe, double value, String name) {
+ this.groupMe = groupMe;
+ this.value = value;
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return groupMe + "," + value + "," + name;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
new file mode 100644
index 0000000..ef616a9
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+ @Test
+ def testRenamingProxyTypeEquality(): Unit = {
+ val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
+ .asInstanceOf[CompositeType[TestPojo]]
+
+ val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
+ pojoTypeInfo1,
+ Array("someInt", "aString", "doubleArray"))
+
+ val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
+ pojoTypeInfo1,
+ Array("someInt", "aString", "doubleArray"))
+
+ assert(tpeInfo1.equals(tpeInfo2))
+ assert(tpeInfo1.hashCode() == tpeInfo2.hashCode())
+ }
+
+ @Test
+ def testRenamingProxyTypeInequality(): Unit = {
+ val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
+ .asInstanceOf[CompositeType[TestPojo]]
+
+ val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
+ pojoTypeInfo1,
+ Array("someInt", "aString", "doubleArray"))
+
+ val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
+ pojoTypeInfo1,
+ Array("foobar", "aString", "doubleArray"))
+
+ assert(!tpeInfo1.equals(tpeInfo2))
+ }
+}
+
+final class TestPojo {
+ var someInt: Int = 0
+ private var aString: String = null
+ var doubleArray: Array[Double] = null
+
+ def setaString(aString: String) {
+ this.aString = aString
+ }
+
+ def getaString: String = {
+ return aString
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index cab6ab7..bc53a60 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -18,40 +18,35 @@
package org.apache.flink.api.scala.operators
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
import org.junit._
-import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.apache.flink.api.scala._
+import scala.collection.mutable.ArrayBuffer
+
@RunWith(classOf[Parameterized])
class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
+ private var result: Seq[String] = null
private var expected: String = null
- private val _tempFolder = new TemporaryFolder()
-
- private final val FULL_TUPLE_3_STRING: String = "1,1,Hi\n" + "2,2,Hello\n" + "3,2," +
- "Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3," +
- "Luke Skywalker\n" + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4," +
- "Comment#4\n" + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5," +
- "Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6," +
- "Comment#12\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- @Rule
- def tempFolder = _tempFolder
+ private final val FULL_TUPLE_3_STRING: String = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2," +
+ "Hello world)\n" + "(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3," +
+ "Luke Skywalker)\n" + "(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" +
+ "(10,4," +
+ "Comment#4)\n" + "(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" + "(14,5," +
+ "Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" + "(17,6,Comment#11)\n" +
+ "(18,6," +
+ "Comment#12)\n" + "(19,6,Comment#13)\n" + "(20,6,Comment#14)\n" + "(21,6,Comment#15)\n"
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
@After
def after(): Unit = {
- TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
+ import collection.JavaConverters._
+ TestBaseUtils.compareResultAsText(ArrayBuffer(result: _*).asJava, expected)
}
@Test
@@ -62,8 +57,7 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env)
val unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
- unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
+ result = unionDs.collect().map(_.toString)
expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
}
@@ -79,8 +73,7 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
.union(CollectionDataSets.get3TupleDataSet(env))
.union(CollectionDataSets.get3TupleDataSet(env))
.union(CollectionDataSets.get3TupleDataSet(env))
- unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
+ result = unionDs.collect().map(_.toString)
expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
}
@@ -94,8 +87,21 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
// Don't know how to make an empty result in an other way than filtering it
val empty = CollectionDataSets.get3TupleDataSet(env).filter( t => false )
val unionDs = CollectionDataSets.get3TupleDataSet(env).union(empty)
- unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
+ result = unionDs.collect().map(_.toString())
expected = FULL_TUPLE_3_STRING
}
+
+ @Test
+ def testUnionWithOptionType(): Unit = {
+ /*
+ * Union of a tuple with an Option field
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val data = Seq((Some(1), 1), (None, -1), (Some(42), 42))
+ val input1 = env.fromCollection(data)
+ val input2 = env.fromCollection(data)
+
+ result = input1.union(input2).collect().map(_.toString())
+ expected = data ++ data mkString("\n")
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
index 8460bbc..43c35f9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -340,7 +340,7 @@ class TypeInformationGenTest {
Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
Assert.assertEquals(
classOf[CustomType],
- ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentType)
+ ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo.getTypeClass)
}
@Test