You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/27 21:08:00 UTC

incubator-flink git commit: [scala] [runtime] Add extra tests for sorting of case classes

Repository: incubator-flink
Updated Branches:
  refs/heads/master 45fb6d823 -> a9b7cbe02


[scala] [runtime] Add extra tests for sorting of case classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a9b7cbe0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a9b7cbe0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a9b7cbe0

Branch: refs/heads/master
Commit: a9b7cbe0206423f4a2c22ceab225e5d9812d9bfa
Parents: 45fb6d8
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 27 19:47:12 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 27 20:42:55 2014 +0100

----------------------------------------------------------------------
 .../api/common/typeutils/CompositeType.java     |   3 +-
 .../runtime/TupleComparatorILD2Test.java        |  16 +-
 .../runtime/TupleComparatorILDXC2Test.java      |  16 +-
 .../scala/runtime/CaseClassComparatorTest.scala | 158 +++++++++++++++++++
 4 files changed, 175 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a9b7cbe0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 51ad548..0a2f1b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
  * Type Information for Tuple and Pojo types
  * 
  * The class is taking care of serialization and comparators for Tuples as well.
- * See @see {@link Keys} class for fields setup.
  */
 public abstract class CompositeType<T> extends TypeInformation<T> {
 	
@@ -66,7 +65,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 	/**
 	 * Generic implementation of the comparator creation. Composite types are supplying the infrastructure
 	 * to create the actual comparators
-	 * @return
+	 * @return The comparator
 	 */
 	public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset) {
 		initializeNewComparator(logicalKeyFields.length);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a9b7cbe0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
index 6b4bfb1..9f1c7b8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILD2Test.java
@@ -35,14 +35,14 @@ public class TupleComparatorILD2Test extends TupleComparatorTestBase<Tuple3<Inte
 
 	@SuppressWarnings("unchecked")
 	Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(14), 20.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(15), 23.2),
-		new Tuple3<Integer, Long, Double>(5, Long.valueOf(15), 20.0),
-		new Tuple3<Integer, Long, Double>(5, Long.valueOf(20), 20.0),
-		new Tuple3<Integer, Long, Double>(6, Long.valueOf(20), 23.2),
-		new Tuple3<Integer, Long, Double>(6, Long.valueOf(29), 20.0),
-		new Tuple3<Integer, Long, Double>(7, Long.valueOf(29), 20.0),
-		new Tuple3<Integer, Long, Double>(7, Long.valueOf(34), 23.2)
+		new Tuple3<Integer, Long, Double>(4, 14L, 20.0),
+		new Tuple3<Integer, Long, Double>(4, 15L, 23.2),
+		new Tuple3<Integer, Long, Double>(5, 15L, 20.0),
+		new Tuple3<Integer, Long, Double>(5, 20L, 20.0),
+		new Tuple3<Integer, Long, Double>(6, 20L, 23.2),
+		new Tuple3<Integer, Long, Double>(6, 29L, 20.0),
+		new Tuple3<Integer, Long, Double>(7, 29L, 20.0),
+		new Tuple3<Integer, Long, Double>(7, 34L, 23.2)
 	};
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a9b7cbe0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
index 8180b1e..2a5fc29 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorILDXC2Test.java
@@ -35,14 +35,14 @@ public class TupleComparatorILDXC2Test extends TupleComparatorTestBase<Tuple3<In
 
 	@SuppressWarnings("unchecked")
 	Tuple3<Integer, Long, Double>[] dataISD = new Tuple3[]{
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(4), 20.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(5), 20.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(3), 23.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(19), 23.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(17), 24.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(18), 24.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(24), 25.0),
-		new Tuple3<Integer, Long, Double>(4, Long.valueOf(25), 25.0)
+		new Tuple3<Integer, Long, Double>(4, 4L, 20.0),
+		new Tuple3<Integer, Long, Double>(4, 5L, 20.0),
+		new Tuple3<Integer, Long, Double>(4, 3L, 23.0),
+		new Tuple3<Integer, Long, Double>(4, 19L, 23.0),
+		new Tuple3<Integer, Long, Double>(4, 17L, 24.0),
+		new Tuple3<Integer, Long, Double>(4, 18L, 24.0),
+		new Tuple3<Integer, Long, Double>(4, 24L, 25.0),
+		new Tuple3<Integer, Long, Double>(4, 25L, 25.0)
 	};
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a9b7cbe0/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
new file mode 100644
index 0000000..24dbfe5
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.runtime
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.flink.api.scala._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.core.memory.DataInputView
+import java.io.IOException
+import org.apache.flink.api.common.typeutils.TypeComparator
+import com.amazonaws.services.sqs.model.UnsupportedOperationException
+import org.apache.flink.core.memory.MemorySegment
+import org.apache.flink.core.memory.DataOutputView
+import org.mockito.Mockito
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter
+import java.util.List
+import java.util.ArrayList
+import org.apache.flink.runtime.operators.sort.QuickSort
+import java.util.Random
+
+class CaseClassComparatorTest {
+
+  case class CaseTestClass(a: Int, b: Int, c: Int, d: String)
+  
+  @Test
+  def testNormalizedKeyGeneration(): Unit = {
+    try {
+      
+      val typeInfo = implicitly[TypeInformation[CaseTestClass]]
+                                     .asInstanceOf[CompositeType[CaseTestClass]]
+      
+      val serializer = typeInfo.createSerializer();
+      val comparator = new FailingCompareDeserializedWrapper(
+          typeInfo.createComparator(Array[Int](0, 2), Array[Boolean](true, true), 0))
+      
+      assertTrue(comparator.supportsNormalizedKey())
+      assertEquals(8, comparator.getNormalizeKeyLen())
+      assertFalse(comparator.isNormalizedKeyPrefixOnly(8))
+      
+      // validate the failing mock
+      {
+        val in1 : DataInputView = Mockito.mock(classOf[DataInputView])
+        val in2 : DataInputView = Mockito.mock(classOf[DataInputView])
+        
+        try {
+          comparator.compareSerialized(in1, in2)
+          fail("should throw an exception")
+        }
+        catch {
+          case e: UnsupportedOperationException => // fine
+          case ee: Exception => fail("unexpected exception")
+        }
+      }
+      
+      
+      val numMemSegs = 20
+      val memory : List[MemorySegment] = new ArrayList[MemorySegment](numMemSegs)
+      for (i <- 1 to numMemSegs) {
+        memory.add(new MemorySegment(new Array[Byte](32*1024)))
+      }
+      
+      val sorter : NormalizedKeySorter[CaseTestClass] = new NormalizedKeySorter[CaseTestClass](
+               serializer, comparator, memory)
+      
+      val rnd = new Random()
+      var moreToGo = true
+      var num = 0
+      
+      while (moreToGo) {
+        val next = CaseTestClass(rnd.nextInt(), rnd.nextInt(), rnd.nextInt(), "")
+        moreToGo = sorter.write(next)
+        num += 1
+      }
+      
+      print(num)
+      
+      new QuickSort().sort(sorter)
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage());
+      }
+    }
+  }
+  
+  class FailingCompareDeserializedWrapper[T](wrapped: TypeComparator[T]) extends TypeComparator[T] {
+
+    def hash(record: T) : Int = wrapped.hash(record)
+
+    def setReference(toCompare: T) = wrapped.setReference(toCompare)
+
+    def equalToReference(candidate: T): Boolean = wrapped.equalToReference(candidate)
+    
+    def compareToReference(referencedComparator: TypeComparator[T]): Int 
+      = wrapped.compareToReference(referencedComparator)
+    
+    override def supportsCompareAgainstReference(): Boolean 
+      = wrapped.supportsCompareAgainstReference()
+    
+    def compare(first: T, second: T): Int = wrapped.compare(first, second)
+    
+    def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = {
+      throw new UnsupportedOperationException("Not Supported");
+    }
+    
+    def supportsNormalizedKey(): Boolean = wrapped.supportsNormalizedKey()
+    
+    def supportsSerializationWithKeyNormalization(): Boolean
+      = wrapped.supportsSerializationWithKeyNormalization()
+    
+    def getNormalizeKeyLen(): Int = wrapped.getNormalizeKeyLen()
+    
+    def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean
+      = wrapped.isNormalizedKeyPrefixOnly(keyBytes)
+    
+    def putNormalizedKey(record: T, target: MemorySegment, offset: Int, numBytes: Int): Unit
+      = wrapped.putNormalizedKey(record, target, offset, numBytes)
+    
+    def writeWithKeyNormalization(record: T, target: DataOutputView): Unit
+      = wrapped.writeWithKeyNormalization(record, target)
+    
+    def readWithKeyDenormalization(reuse: T, source: DataInputView): T
+      = wrapped.readWithKeyDenormalization(reuse, source)
+    
+    def invertNormalizedKey(): Boolean = wrapped.invertNormalizedKey()
+    
+    def duplicate(): TypeComparator[T] = new FailingCompareDeserializedWrapper(wrapped.duplicate())
+    
+    def extractKeys(record: Object, target: Array[Object], index: Int): Int
+      = wrapped.extractKeys(record, target, index)
+    
+    def getFlatComparators(): Array[TypeComparator[_]] = wrapped.getFlatComparators()
+    
+    override def compareAgainstReference(keys: Array[Comparable[_]]): Int = {
+      throw new UnsupportedOperationException("Workaround hack.")
+    }
+  }
+}