You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/27 18:55:56 UTC
[3/5] incubator-flink git commit: [FLINK-1273] [runtime] Add Void
type to basic types
[FLINK-1273] [runtime] Add Void type to basic types
Add optional test for external sorting of case classes.
Fix various warnings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d554faa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d554faa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d554faa3
Branch: refs/heads/master
Commit: d554faa33d285c4c27aefbc601a55e48beead81f
Parents: 17bc479
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 27 14:22:35 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 27 18:18:26 2014 +0100
----------------------------------------------------------------------
.../api/common/typeinfo/BasicTypeInfo.java | 14 +-
.../common/typeutils/base/EnumComparator.java | 4 +-
.../common/typeutils/base/EnumSerializer.java | 2 +-
.../typeutils/base/GenericArraySerializer.java | 6 +-
.../common/typeutils/base/VoidSerializer.java | 85 +++++++
.../api/common/io/SequentialFormatTestBase.java | 4 +-
.../api/common/io/SerializedFormatTest.java | 2 +-
flink-dist/pom.xml | 2 +-
.../misc/MassiveCaseClassSortingITCase.scala | 225 +++++++++++++++++++
9 files changed, 334 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index f27da07..61d830a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -23,6 +23,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanComparator;
@@ -45,10 +46,11 @@ import org.apache.flink.api.common.typeutils.base.ShortComparator;
import org.apache.flink.api.common.typeutils.base.ShortSerializer;
import org.apache.flink.api.common.typeutils.base.StringComparator;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
/**
- *
+ * Type information for primitive types (int, long, double, byte, ...), String, Date, and Void.
*/
public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
@@ -62,6 +64,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class);
public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, CharSerializer.INSTANCE, CharComparator.class);
public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, DateSerializer.INSTANCE, DateComparator.class);
+ public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, VoidSerializer.INSTANCE, null);
// --------------------------------------------------------------------------------------------
@@ -117,7 +120,11 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
@Override
public TypeComparator<T> createComparator(boolean sortOrderAscending) {
- return instantiateComparator(comparatorClass, sortOrderAscending);
+ if (comparatorClass != null) {
+ return instantiateComparator(comparatorClass, sortOrderAscending);
+ } else {
+ throw new InvalidTypesException("The type " + clazz.getSimpleName() + " cannot be used as a key.");
+ }
}
// --------------------------------------------------------------------------------------------
@@ -150,6 +157,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
throw new NullPointerException();
}
+ @SuppressWarnings("unchecked")
BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
return info;
}
@@ -185,5 +193,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
TYPES.put(Character.class, CHAR_TYPE_INFO);
TYPES.put(char.class, CHAR_TYPE_INFO);
TYPES.put(Date.class, DATE_TYPE_INFO);
+ TYPES.put(Void.class, VOID_TYPE_INFO);
+ TYPES.put(void.class, VOID_TYPE_INFO);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
index ed40bd4..bbb2f40 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumComparator.java
@@ -79,7 +79,7 @@ public final class EnumComparator<T extends Enum<T>> extends BasicTypeComparator
}
@Override
- public EnumComparator duplicate() {
- return new EnumComparator(ascendingComparison);
+ public EnumComparator<T> duplicate() {
+ return new EnumComparator<T>(ascendingComparison);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
index a99fbf5..7ecf82a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java
@@ -93,7 +93,7 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
@Override
public boolean equals(Object obj) {
if(obj instanceof EnumSerializer) {
- EnumSerializer other = (EnumSerializer) obj;
+ EnumSerializer<?> other = (EnumSerializer<?>) obj;
return other.enumClass == this.enumClass;
} else {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 9d616e2..c72132d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-
/**
* A serializer for arrays of objects.
*
@@ -163,4 +162,9 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
return false;
}
}
+
+ @Override
+ public String toString() {
+ return "Serializer " + componentClass.getName() + "[]";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
new file mode 100644
index 0000000..33bb901
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public final class VoidSerializer extends TypeSerializerSingleton<Void> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final VoidSerializer INSTANCE = new VoidSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public boolean isStateful() {
+ return false;
+ }
+
+ @Override
+ public Void createInstance() {
+ return null;
+ }
+
+ @Override
+ public Void copy(Void from) {
+ return null;
+ }
+
+ @Override
+ public Void copy(Void from, Void reuse) {
+ return null;
+ }
+
+ @Override
+ public int getLength() {
+ return 1;
+ }
+
+ @Override
+ public void serialize(Void record, DataOutputView target) throws IOException {
+ // make progress in the stream, write one byte
+ target.write(0);
+
+ }
+
+ @Override
+ public Void deserialize(DataInputView source) throws IOException {
+ source.readByte();
+ return null;
+ }
+
+ @Override
+ public Void deserialize(Void reuse, DataInputView source) throws IOException {
+ source.readByte();
+ return null;
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.write(source.readByte());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 7afd3b4..8c4e090 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -179,7 +179,7 @@ public abstract class SequentialFormatTestBase<T> {
Configuration configuration = new Configuration();
configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
if (this.degreeOfParallelism == 1) {
- BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI().toString(),
+ BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI().toString(),
configuration);
for (int index = 0; index < this.numberOfTuples; index++) {
output.writeRecord(this.getRecord(index));
@@ -190,7 +190,7 @@ public abstract class SequentialFormatTestBase<T> {
this.tempFile.mkdir();
int recordIndex = 0;
for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
- BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI() + "/" +
+ BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI() + "/" +
(fileIndex+1), configuration);
for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
output.writeRecord(this.getRecord(recordIndex));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
index 6f4fb58..90347b8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
@@ -57,7 +57,7 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration
configuration) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 29b9a61..4283ac1 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -163,7 +163,7 @@ under the License.
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
- <version>2.4</version>
+ <version>2.4</version><!--$NO-MVN-MAN-VER$-->
<executions>
<!-- Uber-jar -->
<execution>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d554faa3/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
new file mode 100644
index 0000000..d09fe60
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.misc
+
+import java.io.File
+import java.util.Random
+import java.io.BufferedWriter
+import java.io.FileWriter
+import org.apache.flink.api.scala._
+import java.io.BufferedReader
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
+import java.io.FileReader
+import org.apache.flink.util.MutableObjectIterator
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
+import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory
+import org.junit.Assert._;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
+
+class MassiveCaseClassSortingITCase {
+
+ val SEED : Long = 347569784659278346L
+
+ def testStringTuplesSorting() {
+
+ val NUM_STRINGS = 3000000
+ var input: File = null
+ var sorted: File = null
+
+ try {
+ input = generateFileWithStringTuples(NUM_STRINGS,
+ "http://some-uri.com/that/is/a/common/prefix/to/all")
+
+ sorted = File.createTempFile("sorted_strings", "txt")
+
+ val command = Array("/bin/bash", "-c", "export LC_ALL=\"C\" && cat \""
+ + input.getAbsolutePath + "\" | sort > \"" + sorted.getAbsolutePath + "\"")
+
+ var p: Process = null
+ try {
+ p = Runtime.getRuntime.exec(command)
+ val retCode = p.waitFor()
+ if (retCode != 0) {
+ throw new Exception("Command failed with return code " + retCode)
+ }
+ p = null
+ }
+ finally {
+ if (p != null) {
+ p.destroy()
+ }
+ }
+
+ var sorter: UnilateralSortMerger[StringTuple] = null
+
+ var reader: BufferedReader = null
+ var verifyReader: BufferedReader = null
+
+ try {
+ reader = new BufferedReader(new FileReader(input))
+ val inputIterator = new StringTupleReader(reader)
+
+ val typeInfo = implicitly[TypeInformation[StringTuple]]
+ .asInstanceOf[CompositeType[StringTuple]];
+
+ val serializer = typeInfo.createSerializer()
+ val comparator = typeInfo.createComparator(Array(0, 1), Array(true, true), 0)
+
+ val mm = new DefaultMemoryManager(1024 * 1024, 1)
+ val ioMan = new IOManagerAsync()
+
+ sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,
+ new DummyInvokable(),
+ new RuntimeStatelessSerializerFactory[StringTuple](serializer, classOf[StringTuple]),
+ comparator, 1.0, 4, 0.8f)
+
+ val sortedData = sorter.getIterator
+ reader.close()
+
+ verifyReader = new BufferedReader(new FileReader(sorted))
+ val verifyIterator = new StringTupleReader(verifyReader)
+
+ var num = 0
+ var hasMore = true;
+
+ while (hasMore) {
+ val next = verifyIterator.next(null)
+
+ if (next != null ) {
+ num += 1
+
+ val nextFromFlinkSort = sortedData.next(null)
+
+ assertNotNull(nextFromFlinkSort)
+
+ assertEquals(next.key1, nextFromFlinkSort.key1)
+ assertEquals(next.key2, nextFromFlinkSort.key2)
+
+ // assert array equals does not work here
+ assertEquals(next.value.length, nextFromFlinkSort.value.length)
+ for (i <- 0 until next.value.length) {
+ assertEquals(next.value(i), nextFromFlinkSort.value(i))
+ }
+
+ }
+ else {
+ hasMore = false
+ }
+ }
+
+ assertNull(sortedData.next(null))
+ assertEquals(NUM_STRINGS, num);
+ }
+ finally {
+ if (reader != null) {
+ reader.close()
+ }
+ if (verifyReader != null) {
+ verifyReader.close()
+ }
+ if (sorter != null) {
+ sorter.close()
+ }
+ }
+ }
+ catch {
+ case e: Exception => {
+ System.err.println(e.getMessage)
+ e.printStackTrace()
+ e.getMessage
+ }
+ }
+ finally {
+ if (input != null) {
+ input.delete()
+ }
+ if (sorted != null) {
+ sorted.delete()
+ }
+ }
+ }
+
+
+ private def generateFileWithStringTuples(numStrings: Int, prefix: String): File = {
+ val rnd = new Random(SEED)
+ val bld = new StringBuilder()
+ val f = File.createTempFile("strings", "txt")
+
+ var wrt: BufferedWriter = null
+
+ try {
+ wrt = new BufferedWriter(new FileWriter(f))
+
+ for (i <- 0 until numStrings) {
+ bld.setLength(0)
+ val numComps = rnd.nextInt(5) + 2
+
+ for (z <- 0 until numComps) {
+ if (z > 0) {
+ bld.append(' ')
+ }
+ bld.append(prefix)
+ val len = rnd.nextInt(20) + 10
+
+ for (k <- 0 until len) {
+ val c = (rnd.nextInt(80) + 40).toChar
+ bld.append(c)
+ }
+ }
+ val str = bld.toString
+ wrt.write(str)
+ wrt.newLine()
+ }
+ }
+ finally {
+ wrt.close()
+ }
+ f
+ }
+}
+
+object MassiveCaseClassSortingITCase {
+
+ def main(args: Array[String]) {
+ new MassiveCaseClassSortingITCase().testStringTuplesSorting;
+ }
+}
+
+case class StringTuple(key1: String, key2: String, value: Array[String])
+
+class StringTupleReader(val reader: BufferedReader) extends MutableObjectIterator[StringTuple] {
+
+ override def next(reuse: StringTuple): StringTuple = {
+ val line = reader.readLine()
+ if (line == null) {
+ return null
+ }
+ val parts = line.split(" ")
+ StringTuple(parts(0), parts(1), parts)
+ }
+}
+
+class DummyInvokable extends AbstractInvokable {
+
+ override def registerInputOutput() = {}
+ override def invoke() = {}
+}