You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/27 18:55:56 UTC

[3/5] incubator-flink git commit: [FLINK-1273] [runtime] Add Void type to basic types

[FLINK-1273] [runtime] Add Void type to basic types

Add optional test for external sorting of case classes.
Fix various warnings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d554faa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d554faa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d554faa3

Branch: refs/heads/master
Commit: d554faa33d285c4c27aefbc601a55e48beead81f
Parents: 17bc479
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 27 14:22:35 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 27 18:18:26 2014 +0100

----------------------------------------------------------------------
 .../api/common/typeinfo/BasicTypeInfo.java      |  14 +-
 .../common/typeutils/base/EnumComparator.java   |   4 +-
 .../common/typeutils/base/EnumSerializer.java   |   2 +-
 .../typeutils/base/GenericArraySerializer.java  |   6 +-
 .../common/typeutils/base/VoidSerializer.java   |  85 +++++++
 .../api/common/io/SequentialFormatTestBase.java |   4 +-
 .../api/common/io/SerializedFormatTest.java     |   2 +-
 flink-dist/pom.xml                              |   2 +-
 .../misc/MassiveCaseClassSortingITCase.scala    | 225 +++++++++++++++++++
 9 files changed, 334 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index f27da07..61d830a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -23,6 +23,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.BooleanComparator;
@@ -45,10 +46,11 @@ import org.apache.flink.api.common.typeutils.base.ShortComparator;
 import org.apache.flink.api.common.typeutils.base.ShortSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 
 
 /**
- *
+ * Type information for primitive types (int, long, double, byte, ...), String, Date, and Void.
  */
 public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
 
@@ -62,6 +64,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class);
 	public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, CharSerializer.INSTANCE, CharComparator.class);
 	public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, DateSerializer.INSTANCE, DateComparator.class);
+	public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, VoidSerializer.INSTANCE, null);
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -117,7 +120,11 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 	
 	@Override
 	public TypeComparator<T> createComparator(boolean sortOrderAscending) {
-		return instantiateComparator(comparatorClass, sortOrderAscending);
+		if (comparatorClass != null) {
+			return instantiateComparator(comparatorClass, sortOrderAscending);
+		} else {
+			throw new InvalidTypesException("The type " + clazz.getSimpleName() + " cannot be used as a key.");
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -150,6 +157,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 			throw new NullPointerException();
 		}
 		
+		@SuppressWarnings("unchecked")
 		BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
 		return info;
 	}
@@ -185,5 +193,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 		TYPES.put(Character.class, CHAR_TYPE_INFO);
 		TYPES.put(char.class, CHAR_TYPE_INFO);
 		TYPES.put(Date.class, DATE_TYPE_INFO);
+		TYPES.put(Void.class, VOID_TYPE_INFO);
+		TYPES.put(void.class, VOID_TYPE_INFO);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
index ed40bd4..bbb2f40 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
@@ -79,7 +79,7 @@ public final class EnumComparator<T extends Enum<T>> extends BasicTypeComparator
 	}
 
 	@Override
-	public EnumComparator duplicate() {
-		return new EnumComparator(ascendingComparison);
+	public EnumComparator<T> duplicate() {
+		return new EnumComparator<T>(ascendingComparison);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index a99fbf5..7ecf82a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -93,7 +93,7 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
 	@Override
 	public boolean equals(Object obj) {
 		if(obj instanceof EnumSerializer) {
-			EnumSerializer other = (EnumSerializer) obj;
+			EnumSerializer<?> other = (EnumSerializer<?>) obj;
 			return other.enumClass == this.enumClass;
 		} else {
 			return false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 9d616e2..c72132d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
 /**
  * A serializer for arrays of objects.
  * 
@@ -163,4 +162,9 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 			return false;
 		}
 	}
+	
+	@Override
+	public String toString() {
+		return "Serializer " + componentClass.getName() + "[]";
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
new file mode 100644
index 0000000..33bb901
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class VoidSerializer extends TypeSerializerSingleton<Void> {
+
+	private static final long serialVersionUID = 1L;
+	
+	public static final VoidSerializer INSTANCE = new VoidSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return true;
+	}
+
+	@Override
+	public boolean isStateful() {
+		return false;
+	}
+	
+	@Override
+	public Void createInstance() {
+		return null;
+	}
+
+	@Override
+	public Void copy(Void from) {
+		return null;
+	}
+	
+	@Override
+	public Void copy(Void from, Void reuse) {
+		return null;
+	}
+
+	@Override
+	public int getLength() {
+		return 1;
+	}
+
+	@Override
+	public void serialize(Void record, DataOutputView target) throws IOException {
+		// make progress in the stream, write one byte
+		target.write(0);
+		
+	}
+
+	@Override
+	public Void deserialize(DataInputView source) throws IOException {
+		source.readByte();
+		return null;
+	}
+	
+	@Override
+	public Void deserialize(Void reuse, DataInputView source) throws IOException {
+		source.readByte();
+		return null;
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.write(source.readByte());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 7afd3b4..8c4e090 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -179,7 +179,7 @@ public abstract class SequentialFormatTestBase<T> {
 		Configuration configuration = new Configuration();
 		configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
 		if (this.degreeOfParallelism == 1) {
-			BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI().toString(),
+			BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI().toString(),
 					configuration);
 			for (int index = 0; index < this.numberOfTuples; index++) {
 				output.writeRecord(this.getRecord(index));
@@ -190,7 +190,7 @@ public abstract class SequentialFormatTestBase<T> {
 			this.tempFile.mkdir();
 			int recordIndex = 0;
 			for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
-				BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI() + "/" +
+				BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI() + "/" +
 						(fileIndex+1), configuration);
 				for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
 					output.writeRecord(this.getRecord(recordIndex));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
index 6f4fb58..90347b8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
@@ -57,7 +57,7 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
 	}
 
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Override
 	protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration
 			configuration) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 29b9a61..4283ac1 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -163,7 +163,7 @@ under the License.
 				<plugins>
 					<plugin>
 						<artifactId>maven-assembly-plugin</artifactId>
-						<version>2.4</version>
+						<version>2.4</version><!--$NO-MVN-MAN-VER$-->
 						<executions>
 							<!--  Uber-jar -->
 							<execution>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
new file mode 100644
index 0000000..d09fe60
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.misc
+
+import java.io.File
+import java.util.Random
+import java.io.BufferedWriter
+import java.io.FileWriter
+import org.apache.flink.api.scala._
+import java.io.BufferedReader
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
+import java.io.FileReader
+import org.apache.flink.util.MutableObjectIterator
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory
+import org.junit.Assert._;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
+
+class MassiveCaseClassSortingITCase {
+  
+  val SEED : Long = 347569784659278346L
+  
+  def testStringTuplesSorting() {
+    
+    val NUM_STRINGS = 3000000
+    var input: File = null
+    var sorted: File = null
+    
+    try {
+      input = generateFileWithStringTuples(NUM_STRINGS,
+                                           "http://some-uri.com/that/is/a/common/prefix/to/all")
+        
+      sorted = File.createTempFile("sorted_strings", "txt")
+      
+      val command = Array("/bin/bash", "-c", "export LC_ALL=\"C\" && cat \""
+                        + input.getAbsolutePath + "\" | sort > \"" + sorted.getAbsolutePath + "\"")
+
+      var p: Process = null
+      try {
+        p = Runtime.getRuntime.exec(command)
+        val retCode = p.waitFor()
+        if (retCode != 0) {
+          throw new Exception("Command failed with return code " + retCode)
+        }
+        p = null
+      }
+      finally {
+        if (p != null) {
+          p.destroy()
+        }
+      }
+      
+      var sorter: UnilateralSortMerger[StringTuple] = null
+      
+      var reader: BufferedReader = null
+      var verifyReader: BufferedReader = null
+      
+      try {
+        reader = new BufferedReader(new FileReader(input))
+        val inputIterator = new StringTupleReader(reader)
+        
+        val typeInfo = implicitly[TypeInformation[StringTuple]]
+                                             .asInstanceOf[CompositeType[StringTuple]];
+        
+        val serializer = typeInfo.createSerializer()
+        val comparator = typeInfo.createComparator(Array(0, 1), Array(true, true), 0)
+        
+        val mm = new DefaultMemoryManager(1024 * 1024, 1)
+        val ioMan = new IOManagerAsync()
+        
+        sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,
+              new DummyInvokable(), 
+              new RuntimeStatelessSerializerFactory[StringTuple](serializer, classOf[StringTuple]),
+              comparator, 1.0, 4, 0.8f)
+            
+        val sortedData = sorter.getIterator
+        reader.close()
+        
+        verifyReader = new BufferedReader(new FileReader(sorted))
+        val verifyIterator = new StringTupleReader(verifyReader)
+        
+        var num = 0
+        var hasMore = true;
+        
+        while (hasMore) {
+          val next = verifyIterator.next(null)
+          
+          if (next != null ) {
+            num += 1
+            
+            val nextFromFlinkSort = sortedData.next(null)
+            
+            assertNotNull(nextFromFlinkSort)
+            
+            assertEquals(next.key1, nextFromFlinkSort.key1)
+            assertEquals(next.key2, nextFromFlinkSort.key2)
+            
+            // assert array equals does not work here
+            assertEquals(next.value.length, nextFromFlinkSort.value.length)
+            for (i <- 0 until next.value.length) {
+              assertEquals(next.value(i), nextFromFlinkSort.value(i))
+            }
+            
+          }
+          else {
+            hasMore = false
+          }
+        }
+        
+        assertNull(sortedData.next(null))
+        assertEquals(NUM_STRINGS, num);
+      }
+      finally {
+        if (reader != null) {
+          reader.close()
+        }
+        if (verifyReader != null) {
+          verifyReader.close()
+        }
+        if (sorter != null) {
+          sorter.close()
+        }
+      }
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        e.getMessage
+      }
+    }
+    finally {
+      if (input != null) {
+        input.delete()
+      }
+      if (sorted != null) {
+        sorted.delete()
+      }
+    }
+  }
+  
+  
+  private def generateFileWithStringTuples(numStrings: Int, prefix: String): File = {
+    val rnd = new Random(SEED)
+    val bld = new StringBuilder()
+    val f = File.createTempFile("strings", "txt")
+    
+    var wrt: BufferedWriter = null
+    
+    try {
+      wrt = new BufferedWriter(new FileWriter(f))
+
+      for (i <- 0 until numStrings) {
+        bld.setLength(0)
+        val numComps = rnd.nextInt(5) + 2
+        
+        for (z <- 0 until numComps) {
+          if (z > 0) {
+            bld.append(' ')
+          }
+          bld.append(prefix)
+          val len = rnd.nextInt(20) + 10
+          
+          for (k <- 0 until len) {
+            val c = (rnd.nextInt(80) + 40).toChar
+            bld.append(c)
+          }
+        }
+        val str = bld.toString
+        wrt.write(str)
+        wrt.newLine()
+      }
+    }
+    finally {
+      wrt.close()
+    }
+    f
+  }
+}
+
+object MassiveCaseClassSortingITCase {
+  
+  def main(args: Array[String]) {
+    new MassiveCaseClassSortingITCase().testStringTuplesSorting;
+  }
+}
+
+case class StringTuple(key1: String, key2: String, value: Array[String])
+  
+class StringTupleReader(val reader: BufferedReader) extends MutableObjectIterator[StringTuple] {
+  
+  override def next(reuse: StringTuple): StringTuple = {
+    val line = reader.readLine()
+    if (line == null) {
+      return null
+    }
+    val parts = line.split(" ")
+    StringTuple(parts(0), parts(1), parts)
+  }
+}
+
+class DummyInvokable extends AbstractInvokable {
+
+  override def registerInputOutput() = {}
+  override def invoke() = {}
+}