You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 13:24:06 UTC

[3/9] flink git commit: [FLINK-2637] [api-breaking] [types] Adds equals and hashCode method to TypeInformations and TypeSerializers - Fixes ObjectArrayTypeInfo - Makes CompositeTypes serializable - Adds test for equality relation's symmetric property - A

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
new file mode 100644
index 0000000..2e98a8f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class StreamWindowTypeInfoTest extends TestLogger {
+
+	public static class TestClass{}
+
+	@Test
+	public void testStreamWindowTypeInfoEquality() {
+		StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
+		StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
+
+		assertEquals(tpeInfo1, tpeInfo2);
+		assertEquals(tpeInfo1.hashCode(), tpeInfo2.hashCode());
+	}
+
+	@Test
+	public void testStreamWindowTypeInfoInequality() {
+		StreamWindowTypeInfo<TestClass> tpeInfo1 = new StreamWindowTypeInfo<>(new GenericTypeInfo<>(TestClass.class));
+		StreamWindowTypeInfo<TestClass> tpeInfo2 = new StreamWindowTypeInfo<>(new PojoTypeInfo<>(TestClass.class, new ArrayList<PojoField>()));
+
+		assertNotEquals(tpeInfo1, tpeInfo2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
index 6a9cbfe..dd598ab 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
@@ -21,8 +21,9 @@ import java.util
 
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer}
+import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
+FlatFieldDescriptor}
+import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer}
 
 /**
  * A TypeInformation that is used to rename fields of an underlying CompositeType. This
@@ -30,8 +31,9 @@ import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, Typ
  * that does not get translated to a runtime operator.
  */
 class RenamingProxyTypeInfo[T](
-    tpe: CompositeType[T],
-    fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) {
+    val tpe: CompositeType[T],
+    val fieldNames: Array[String])
+  extends CompositeType[T](tpe.getTypeClass) {
 
   def getUnderlyingType: CompositeType[T] = tpe
 
@@ -86,16 +88,6 @@ class RenamingProxyTypeInfo[T](
         executionConfig: ExecutionConfig) =
     tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig)
 
-  // These are never called since we override create comparator
-  override protected def initializeNewComparator(localKeyCount: Int): Unit =
-    throw new RuntimeException("Cannot happen.")
-
-  override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] =
-    throw new RuntimeException("Cannot happen.")
-
-  override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit =
-    throw new RuntimeException("Cannot happen.")
-
   override def getFlatFields(
       fieldExpression: String,
       offset: Int,
@@ -106,4 +98,27 @@ class RenamingProxyTypeInfo[T](
   override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
     tpe.getTypeAt(fieldExpression)
   }
+
+  override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = {
+    throw new RuntimeException("This method should never be called because createComparator is " +
+      "overwritten.")
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case renamingProxy: RenamingProxyTypeInfo[_] =>
+        renamingProxy.canEqual(this) &&
+        tpe.equals(renamingProxy.tpe) &&
+        fieldNames.sameElements(renamingProxy.fieldNames)
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = {
+    31 * tpe.hashCode() + util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]])
+  }
+
+  override def canEqual(obj: Any): Boolean = {
+    obj.isInstanceOf[RenamingProxyTypeInfo[_]]
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
index 527e2b4..02219c7 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 /**
  * Serializer for [[Row]].
  */
-class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
+class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
   extends TypeSerializer[Row] {
 
   private def getFieldSerializers = fieldSerializers
@@ -150,9 +150,17 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
   override def equals(any: scala.Any): Boolean = {
     any match {
       case otherRS: RowSerializer =>
-        val otherFieldSerializers = otherRS.getFieldSerializers.asInstanceOf[Array[AnyRef]]
-        util.Arrays.deepEquals(fieldSerializers.asInstanceOf[Array[AnyRef]], otherFieldSerializers)
+        otherRS.canEqual(this) &&
+        fieldSerializers.sameElements(otherRS.fieldSerializers)
       case _ => false
     }
   }
+
+  override def canEqual(obj: scala.Any): Boolean = {
+    obj.isInstanceOf[RowSerializer]
+  }
+
+  override def hashCode(): Int = {
+    util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
new file mode 100644
index 0000000..9c62a51
--- /dev/null
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.test;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.TableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PojoGroupingITCase extends MultipleProgramsTestBase {
+
+	public PojoGroupingITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testPojoGrouping() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+			new Tuple3<String, Double, String>("A", 23.0, "Z"),
+			new Tuple3<String, Double, String>("A", 24.0, "Y"),
+			new Tuple3<String, Double, String>("B", 1.0, "Z"));
+
+		TableEnvironment tableEnv = new TableEnvironment();
+
+		Table table = tableEnv
+			.fromDataSet(data, "groupMe, value, name")
+			.select("groupMe, value, name")
+			.where("groupMe != 'B'");
+
+		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
+
+		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+			.sortGroup("value", Order.DESCENDING)
+			.first(1);
+		List<MyPojo> resultList = result.collect();
+
+		compareResultAsText(resultList, "A,24.0,Y");
+	}
+
+	public static class MyPojo implements Serializable {
+		private static final long serialVersionUID = 8741918940120107213L;
+
+		public String groupMe;
+		public double value;
+		public String name;
+
+		public MyPojo() {
+			// for serialization
+		}
+
+		public MyPojo(String groupMe, double value, String name) {
+			this.groupMe = groupMe;
+			this.value = value;
+			this.name = name;
+		}
+
+		@Override
+		public String toString() {
+			return groupMe + "," + value + "," + name;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
new file mode 100644
index 0000000..ef616a9
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.util.TestLogger
+import org.junit.Test
+import org.scalatest.junit.JUnitSuiteLike
+
+class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike {
+
+  @Test
+  def testRenamingProxyTypeEquality(): Unit = {
+    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
+      .asInstanceOf[CompositeType[TestPojo]]
+
+    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("someInt", "aString", "doubleArray"))
+
+    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("someInt", "aString", "doubleArray"))
+
+    assert(tpeInfo1.equals(tpeInfo2))
+    assert(tpeInfo1.hashCode() == tpeInfo2.hashCode())
+  }
+
+  @Test
+  def testRenamingProxyTypeInequality(): Unit = {
+    val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo])
+      .asInstanceOf[CompositeType[TestPojo]]
+
+    val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("someInt", "aString", "doubleArray"))
+
+    val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo](
+      pojoTypeInfo1,
+      Array("foobar", "aString", "doubleArray"))
+
+    assert(!tpeInfo1.equals(tpeInfo2))
+  }
+}
+
+final class TestPojo {
+  var someInt: Int = 0
+  private var aString: String = null
+  var doubleArray: Array[Double] = null
+
+  def setaString(aString: String) {
+    this.aString = aString
+  }
+
+  def getaString: String = {
+    return aString
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index cab6ab7..bc53a60 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -18,40 +18,35 @@
 package org.apache.flink.api.scala.operators
 
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase}
 import org.junit._
-import org.junit.rules.TemporaryFolder
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
 import org.apache.flink.api.scala._
 
+import scala.collection.mutable.ArrayBuffer
+
 @RunWith(classOf[Parameterized])
 class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-  private var resultPath: String = null
+  private var result: Seq[String] = null
   private var expected: String = null
-  private val _tempFolder = new TemporaryFolder()
-
-  private final val FULL_TUPLE_3_STRING: String = "1,1,Hi\n" + "2,2,Hello\n" + "3,2," +
-    "Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3," +
-    "Luke Skywalker\n" + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4," +
-    "Comment#4\n" + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5," +
-    "Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6," +
-    "Comment#12\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
 
-  @Rule
-  def tempFolder = _tempFolder
+  private final val FULL_TUPLE_3_STRING: String = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2," +
+    "Hello world)\n" + "(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3," +
+    "Luke Skywalker)\n" + "(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" +
+    "(10,4," +
+    "Comment#4)\n" + "(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" + "(14,5," +
+    "Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" + "(17,6,Comment#11)\n" +
+    "(18,6," +
+    "Comment#12)\n" + "(19,6,Comment#13)\n" + "(20,6,Comment#14)\n" + "(21,6,Comment#15)\n"
 
-  @Before
-  def before(): Unit = {
-    resultPath = tempFolder.newFile().toURI.toString
-  }
 
   @After
   def after(): Unit = {
-    TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
+    import collection.JavaConverters._
+    TestBaseUtils.compareResultAsText(ArrayBuffer(result: _*).asJava, expected)
   }
 
   @Test
@@ -62,8 +57,7 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val unionDs = ds.union(CollectionDataSets.get3TupleDataSet(env))
-    unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
+    result = unionDs.collect().map(_.toString)
     expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
   }
 
@@ -79,8 +73,7 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
       .union(CollectionDataSets.get3TupleDataSet(env))
       .union(CollectionDataSets.get3TupleDataSet(env))
       .union(CollectionDataSets.get3TupleDataSet(env))
-    unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
+    result = unionDs.collect().map(_.toString)
     expected = FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING +
       FULL_TUPLE_3_STRING + FULL_TUPLE_3_STRING
   }
@@ -94,8 +87,21 @@ class UnionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode
     // Don't know how to make an empty result in an other way than filtering it
     val empty = CollectionDataSets.get3TupleDataSet(env).filter( t => false )
     val unionDs = CollectionDataSets.get3TupleDataSet(env).union(empty)
-    unionDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
-    env.execute()
+    result = unionDs.collect().map(_.toString())
     expected = FULL_TUPLE_3_STRING
   }
+
+  @Test
+  def testUnionWithOptionType(): Unit = {
+    /*
+     * Union of a tuple with an Option field
+     */
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val data = Seq((Some(1), 1), (None, -1), (Some(42), 42))
+    val input1 = env.fromCollection(data)
+    val input2 = env.fromCollection(data)
+
+    result = input1.union(input2).collect().map(_.toString())
+    expected = data ++ data mkString("\n")
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ca853e0/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
index 8460bbc..43c35f9 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -340,7 +340,7 @@ class TypeInformationGenTest {
     Assert.assertTrue(ti.isInstanceOf[ObjectArrayTypeInfo[_, _]])
     Assert.assertEquals(
       classOf[CustomType],
-      ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentType)
+      ti.asInstanceOf[ObjectArrayTypeInfo[_, _]].getComponentInfo.getTypeClass)
   }
 
   @Test