You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/08 16:18:52 UTC
[1/2] git commit: [FLINK-925] Support KeySelector function returning
Tuple types
Repository: incubator-flink
Updated Branches:
refs/heads/master c0c2abda5 -> 122c9b023
[FLINK-925] Support KeySelector function returning Tuple types
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/fb3bdeac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/fb3bdeac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/fb3bdeac
Branch: refs/heads/master
Commit: fb3bdeac0b0c73e905945a1ecdbf29bf83ba3a6e
Parents: c0c2abd
Author: TobiasWiens <to...@gmail.com>
Authored: Sun Jul 6 17:47:00 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Sep 8 16:17:32 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 3 +-
.../flink/api/java/operators/JoinOperator.java | 2 +-
.../flink/api/java/typeutils/TupleTypeInfo.java | 17 +-
.../java/typeutils/runtime/TupleComparator.java | 3 +-
.../runtime/TupleComparatorTTT1Test.java | 155 +++++++++++++++++++
.../runtime/TupleComparatorTTT2Test.java | 153 ++++++++++++++++++
.../runtime/TupleComparatorTTT3Test.java | 154 ++++++++++++++++++
7 files changed, 481 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index ca2a5e9..4688349 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -456,7 +456,8 @@ public abstract class DataSet<T> {
* @see org.apache.flink.api.java.operators.GroupReduceOperator
* @see DataSet
*/
- public <K extends Comparable<K>> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor) {
+
+ public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor) {
return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 2efe7e9..1ca2ec9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -756,7 +756,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* @see KeySelector
* @see DataSet
*/
- public <K extends Comparable<K>> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
+ public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector, input1.getType()));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 737be56..94d3252 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -120,7 +120,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
}
// special case for tuples where field zero is the key field
- if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0) {
+ if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0 && !types[0].isTupleType()) {
return createLeadingFieldComparator(orders[0], types[0]);
}
@@ -141,8 +141,21 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
int keyPos = logicalKeyFields[i];
if (types[keyPos].isKeyType() && types[keyPos] instanceof AtomicType) {
fieldComparators[i] = ((AtomicType<?>) types[keyPos]).createComparator(orders[i]);
+ } else if(types[keyPos].isTupleType() && types[keyPos] instanceof TupleTypeInfo){ // Check for tuple
+ TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) types[keyPos];
+
+ // All fields are key
+ int[] allFieldsKey = new int[tupleType.types.length];
+ for(int h = 0; h < tupleType.types.length; h++){
+ allFieldsKey[h]=h;
+ }
+
+ // Prepare order
+ boolean[] tupleOrders = new boolean[tupleType.types.length];
+ Arrays.fill(tupleOrders, orders[i]);
+ fieldComparators[i] = tupleType.createComparator(allFieldsKey, tupleOrders);
} else {
- throw new IllegalArgumentException("The field at position " + i + " (" + types[keyPos] + ") is no atomic key type.");
+ throw new IllegalArgumentException("The field at position " + i + " (" + types[keyPos] + ") is no atomic key type nor tuple type.");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
index 48cf08b..9de9824 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparator.java
@@ -236,8 +236,7 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
try {
for (; i < keyPositions.length; i++) {
int keyPos = keyPositions[i];
- @SuppressWarnings("unchecked")
- int cmp = comparators[i].compare((T)first.getFieldNotNull(keyPos), (T)second.getFieldNotNull(keyPos));
+ int cmp = comparators[i].compare(first.getFieldNotNull(keyPos), second.getFieldNotNull(keyPos));
if (cmp != 0) {
return cmp;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
new file mode 100644
index 0000000..d406529
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT1Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> {
+
+ @SuppressWarnings("unchecked")
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
+
+ };
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator(
+ boolean ascending) {
+ return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+ new int[] { 0 },
+ new TypeComparator[] {
+ new TupleComparator<Tuple2<String, Double>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new StringComparator(ascending),
+ new DoubleComparator(ascending) },
+ new TypeSerializer[] {
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE }),
+ new TupleComparator<Tuple2<Long, Long>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new LongComparator(ascending),
+ new LongComparator(ascending) },
+ new TypeSerializer[] {
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE }),
+ new TupleComparator<Tuple2<Integer, Long>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new IntComparator(ascending),
+ new LongComparator(ascending) },
+ new TypeSerializer[] {
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE }) },
+ new TypeSerializer[] {
+ new TupleSerializer<Tuple2<String, Double>>(
+ (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE }),
+ new TupleSerializer<Tuple2<Long, Long>>(
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE }),
+ new TupleSerializer<Tuple2<Integer, Long>>(
+ (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE }) });
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() {
+ return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+ (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
+ new TypeSerializer[]{
+ new TupleSerializer<Tuple2<String, Double>> (
+ (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE
+ }),
+ new TupleSerializer<Tuple2<Long, Long>> (
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE
+ }),
+ new TupleSerializer<Tuple2<Integer, Long>> (
+ (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE
+ })
+ });
+ }
+
+ @Override
+ protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() {
+ return this.dataISD;
+ }
+
+ @Override
+ protected void deepEquals(
+ String message,
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should,
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) {
+
+ for (int x = 0; x < should.getArity(); x++) {
+ // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields.
+ if(should.getField(x) instanceof Tuple2) {
+ this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
+ }
+ else {
+ assertEquals(message, should.getField(x), is.getField(x));
+ }
+ }// For
+ }
+
+ protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
+ for (int x = 0; x < should.getArity(); x++) {
+ assertEquals(message, should.getField(x), is.getField(x));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
new file mode 100644
index 0000000..11f3a5e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT2Test.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT2Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> {
+
+ @SuppressWarnings("unchecked")
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
+
+ };
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator(
+ boolean ascending) {
+ return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+ new int[] { 0, 2 },
+ new TypeComparator[] {
+ new TupleComparator<Tuple2<String, Double>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new StringComparator(ascending),
+ new DoubleComparator(ascending) },
+ new TypeSerializer[] {
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE }),
+ new TupleComparator<Tuple2<Long, Long>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new LongComparator(ascending),
+ new LongComparator(ascending) },
+ new TypeSerializer[] {
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE }),
+ new TupleComparator<Tuple2<Integer, Long>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new IntComparator(ascending),
+ new LongComparator(ascending) },
+ new TypeSerializer[] {
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE }) },
+ new TypeSerializer[] {
+ new TupleSerializer<Tuple2<String, Double>>(
+ (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE }),
+ new TupleSerializer<Tuple2<Long, Long>>(
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE }),
+ new TupleSerializer<Tuple2<Integer, Long>>(
+ (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE }) });
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() {
+ return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+ (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
+ new TypeSerializer[]{
+ new TupleSerializer<Tuple2<String, Double>> (
+ (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE}),
+ new TupleSerializer<Tuple2<Long, Long>> (
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE}),
+ new TupleSerializer<Tuple2<Integer, Long>> (
+ (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE})
+ });
+ }
+
+ @Override
+ protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() {
+ return this.dataISD;
+ }
+
+ @Override
+ protected void deepEquals(
+ String message,
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should,
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) {
+
+ for (int x = 0; x < should.getArity(); x++) {
+ // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields.
+ if(should.getField(x) instanceof Tuple2) {
+ this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
+ }
+ else {
+ assertEquals(message, should.getField(x), is.getField(x));
+ }
+ }// For
+ }
+
+ protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
+ for (int x = 0; x < should.getArity(); x++) {
+ assertEquals(message, should.getField(x), is.getField(x));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/fb3bdeac/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
new file mode 100644
index 0000000..1339bca
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT3Test.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.typeutils.runtime;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleComparator;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringComparator;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
+
+public class TupleComparatorTTT3Test extends TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>{
+ @SuppressWarnings("unchecked")
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] dataISD = new Tuple3[]{
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 1.0), new Tuple2<Long, Long>(1L, 1L), new Tuple2<Integer, Long>(4, -10L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 2.0), new Tuple2<Long, Long>(1L, 2L), new Tuple2<Integer, Long>(4, -5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.0), new Tuple2<Long, Long>(1L, 3L), new Tuple2<Integer, Long>(4, 0L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 3.5), new Tuple2<Long, Long>(1L, 4L), new Tuple2<Integer, Long>(4, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("hello", 4325.12), new Tuple2<Long, Long>(1L, 5L), new Tuple2<Integer, Long>(4, 15L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 1.0), new Tuple2<Long, Long>(2L, 4L), new Tuple2<Integer, Long>(45, -5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 2.0), new Tuple2<Long, Long>(2L, 6L), new Tuple2<Integer, Long>(45, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.0), new Tuple2<Long, Long>(2L, 8L), new Tuple2<Integer, Long>(323, 2L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 3.5), new Tuple2<Long, Long>(2L, 9L), new Tuple2<Integer, Long>(323, 5L)),
+ new Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>(new Tuple2<String, Double>("world", 4325.12), new Tuple2<Long, Long>(2L, 123L), new Tuple2<Integer, Long>(555, 1L))
+
+ };
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createComparator(
+ boolean ascending) {
+ return new TupleComparator<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+ new int[] { 0, 1, 2 },
+ new TypeComparator[] {
+ new TupleComparator<Tuple2<String, Double>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new StringComparator(ascending),
+ new DoubleComparator(ascending) },
+ new TypeSerializer[] {
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE }),
+ new TupleComparator<Tuple2<Long, Long>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new LongComparator(ascending),
+ new LongComparator(ascending) },
+ new TypeSerializer[] {
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE }),
+ new TupleComparator<Tuple2<Integer, Long>>(
+ new int[] { 0, 1 },
+ new TypeComparator[] {
+ new IntComparator(ascending),
+ new LongComparator(ascending) },
+ new TypeSerializer[] {
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE }) },
+ new TypeSerializer[] {
+ new TupleSerializer<Tuple2<String, Double>>(
+ (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE }),
+ new TupleSerializer<Tuple2<Long, Long>>(
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE }),
+ new TupleSerializer<Tuple2<Integer, Long>>(
+ (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[] {
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE }) });
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>> createSerializer() {
+ return new TupleSerializer<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>(
+ (Class<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>>) (Class<?>) Tuple3.class,
+ new TypeSerializer[]{
+ new TupleSerializer<Tuple2<String, Double>> (
+ (Class<Tuple2<String, Double>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ StringSerializer.INSTANCE,
+ DoubleSerializer.INSTANCE
+ }),
+ new TupleSerializer<Tuple2<Long, Long>> (
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE
+ }),
+ new TupleSerializer<Tuple2<Integer, Long>> (
+ (Class<Tuple2<Integer, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer[]{
+ IntSerializer.INSTANCE,
+ LongSerializer.INSTANCE
+ })
+ });
+ }
+
+ @Override
+ protected Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>>[] getSortedTestData() {
+ return this.dataISD;
+ }
+
+ @Override
+ protected void deepEquals(
+ String message,
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> should,
+ Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, Tuple2<Integer, Long>> is) {
+
+ for (int x = 0; x < should.getArity(); x++) {
+ // Check whether field is of type Tuple2 because assertEquals must be called on the non Tuple2 fields.
+ if(should.getField(x) instanceof Tuple2) {
+ this.deepEquals(message, (Tuple2<?,?>) should.getField(x), (Tuple2<?,?>)is.getField(x));
+ }
+ else {
+ assertEquals(message, should.getField(x), is.getField(x));
+ }
+ }// For
+ }
+
+ protected void deepEquals(String message, Tuple2<?,?> should, Tuple2<?,?> is) {
+ for (int x = 0; x < should.getArity(); x++) {
+ assertEquals(message, should.getField(x), is.getField(x));
+ }
+ }
+}
[2/2] git commit: [FLINK-925] Extended for distinct operator and
added test cases
Posted by fh...@apache.org.
[FLINK-925] Extended for distinct operator and added test cases
This closes #59
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/122c9b02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/122c9b02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/122c9b02
Branch: refs/heads/master
Commit: 122c9b023cc5f9fcd5cb4914bd90afde0b3c6fc0
Parents: fb3bdea
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Sep 8 13:34:15 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Sep 8 16:17:47 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 2 +-
.../apache/flink/api/java/operators/Keys.java | 5 ++
.../flink/api/java/typeutils/TupleTypeInfo.java | 18 +++-
.../flink/api/java/operator/GroupingTest.java | 66 ++++++++++++--
.../test/javaApiOperators/CoGroupITCase.java | 92 +++++++++++++++++++-
.../test/javaApiOperators/DistinctITCase.java | 41 ++++++++-
.../javaApiOperators/GroupReduceITCase.java | 36 +++++++-
.../flink/test/javaApiOperators/JoinITCase.java | 42 ++++++++-
.../test/javaApiOperators/ReduceITCase.java | 40 ++++++++-
9 files changed, 324 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 4688349..de86eee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -399,7 +399,7 @@ public abstract class DataSet<T> {
* distinction of the DataSet is decided.
* @return A DistinctOperator that represents the distinct DataSet.
*/
- public <K extends Comparable<K>> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
+ public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 2fdb520..8019cc8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -134,6 +134,11 @@ public abstract class Keys<T> {
this.keyExtractor = keyExtractor;
this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+
+ if (!this.keyType.isKeyType()) {
+ throw new IllegalArgumentException("Invalid type of KeySelector keys");
+ }
+
}
public TypeInformation<K> getKeyType() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 94d3252..b9dce11 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -95,7 +95,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
@Override
public boolean isKeyType() {
- return false;
+ return this.isValidKeyType(this);
}
@Override
@@ -228,6 +228,22 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
return tupleInfo;
}
+ private boolean isValidKeyType(TypeInformation<?> typeInfo) {
+ if(typeInfo instanceof TupleTypeInfo) {
+ TupleTypeInfo<?> tupleType = ((TupleTypeInfo<?>)typeInfo);
+ for(int i=0;i<tupleType.getArity();i++) {
+ if (!isValidKeyType(tupleType.getTypeAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ } else if(typeInfo.isKeyType()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// The following lines are generated.
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index f5cd7b9..34b2ac8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -22,20 +22,19 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
public class GroupingTest {
// TUPLE DATA
@@ -189,7 +188,64 @@ public class GroupingTest {
} catch(Exception e) {
Assert.fail();
}
+ }
+
+ @Test
+ @SuppressWarnings("serial")
+ public void testGroupByKeySelector2() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ this.customTypeData.add(new CustomType());
+
+ try {
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should work
+ customDs.groupBy(
+ new KeySelector<GroupingTest.CustomType, Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer,Long> getKey(CustomType value) {
+ return new Tuple2<Integer, Long>(value.myInt, value.myLong);
+ }
+ });
+ } catch(Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ @SuppressWarnings("serial")
+ public void testGroupByKeySelector3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should not work
+ customDs.groupBy(
+ new KeySelector<GroupingTest.CustomType, CustomType>() {
+ @Override
+ public CustomType getKey(CustomType value) {
+ return value;
+ }
+ });
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ @SuppressWarnings("serial")
+ public void testGroupByKeySelector4() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should not work
+ customDs.groupBy(
+ new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>() {
+ @Override
+ public Tuple2<Integer, CustomType> getKey(CustomType value) {
+ return new Tuple2<Integer, CustomType>(value.myInt, value);
+ }
+ });
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index d59d721..f0229cb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -20,10 +20,14 @@ package org.apache.flink.test.javaApiOperators;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.List;
import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.RichCoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,13 +41,11 @@ import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class CoGroupITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 7;
+ private static int NUM_PROGRAMS = 9;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -293,6 +295,67 @@ public class CoGroupITCase extends JavaProgramTestBase {
"14,5,test\n";
}
+ case 8: {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
+
+ coGrouped.writeAsCsv(resultPath);
+ env.execute();
+
+ return "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+ }
+ case 9: {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).
+ equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }).with(new Tuple5Tuple3CoGroup());
+
+ coGrouped.writeAsCsv(resultPath);
+ env.execute();
+
+ return "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}
@@ -481,4 +544,27 @@ public class CoGroupITCase extends JavaProgramTestBase {
out.collect(new Tuple3<Integer, Integer, Integer>(id, sum, broadcast));
}
}
+
+ public static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out)
+ {
+ List<String> strs = new ArrayList<String>();
+
+ for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for(Tuple3<Integer, Long, String> t : second) {
+ for(String s : strs) {
+ out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 0c6f3cc..6e5cd9b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -22,9 +22,12 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
@@ -34,14 +37,12 @@ import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class DistinctITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 5;
+ private static int NUM_PROGRAMS = 6;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -202,6 +203,40 @@ public class DistinctITCase extends JavaProgramTestBase {
"2,2,Hello\n" +
"3,2,Hello world\n";
}
+ case 6: {
+
+ /*
+ * check correctness of distinct on custom type with tuple-returning type extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> reduceDs = ds
+ .distinct(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ })
+ .project(0,4).types(Integer.class, Long.class);
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1\n" +
+ "2,1\n" +
+ "2,2\n" +
+ "3,2\n" +
+ "3,3\n" +
+ "4,1\n" +
+ "4,2\n" +
+ "5,1\n" +
+ "5,2\n" +
+ "5,3\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index bd10c5e..2e00d32 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -48,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class GroupReduceITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 13;
+ private static int NUM_PROGRAMS = 14;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -411,6 +411,40 @@ public class GroupReduceITCase extends JavaProgramTestBase {
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
}
+ case 14: {
+ /*
+ * check correctness of groupReduce on tuples with tuple-returning key selector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+ groupBy(
+ new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).reduceGroup(new Tuple5GroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,0,P-),1\n" +
+ "2,3,0,P-),1\n" +
+ "2,2,0,P-),2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,0,P-),3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,0,P-),1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
default: {
throw new IllegalArgumentException("Invalid program id");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index a293cbf..45a5458 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -46,7 +46,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class JoinITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 13;
+ private static int NUM_PROGRAMS = 14;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -453,6 +453,46 @@ public class JoinITCase extends JavaProgramTestBase {
"2,2,Hello world,2,2,Hello world\n";
}
+ case 14: {
+ /*
+ * UDF Join on tuples with tuple-returning key selectors
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ })
+ .equalTo(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ })
+ .with(new T3T5FlatJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" +
+ "I am fine.,HIJ\n" +
+ "I am fine.,IJK\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index a296a09..fd7fc9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -24,8 +24,11 @@ import java.util.Collection;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
@@ -35,13 +38,11 @@ import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class ReduceITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 8;
+ private static int NUM_PROGRAMS = 9;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -271,6 +272,39 @@ public class ReduceITCase extends JavaProgramTestBase {
"65,5,Hi again!\n" +
"111,6,Hi again!\n";
}
+ case 9: {
+ /*
+ * Reduce with a Tuple-returning KeySelector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds .
+ groupBy(
+ new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).reduce(new Tuple5Reduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ return "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}