You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/06/04 13:16:50 UTC

flink git commit: [FLINK-2135] Fix faulty cast to GroupReduceFunction

Repository: flink
Updated Branches:
  refs/heads/master 0dea359b3 -> 0081fb2ef


[FLINK-2135] Fix faulty cast to GroupReduceFunction

This closes #769


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0081fb2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0081fb2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0081fb2e

Branch: refs/heads/master
Commit: 0081fb2ef2bd03d06a786dd8988865d2ff6168c2
Parents: 0dea359
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Jun 3 15:52:12 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 4 11:48:06 2015 +0200

----------------------------------------------------------------------
 ...PlanUnwrappingSortedReduceGroupOperator.java |  8 +++---
 .../test/javaApiOperators/FirstNITCase.java     | 29 ++++++++++++++++++++
 2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 63ebfa4..e4d41f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 	public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
 											TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey, boolean combinable)
 	{
-		super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
+		super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
 			new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name);
 
 		super.setCombinable(combinable);
@@ -46,7 +46,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 	// --------------------------------------------------------------------------------------------
 
 	@RichGroupReduceFunction.Combinable
-	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+	public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
 		implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
 	{
 
@@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		private Tuple3UnwrappingIterator<IN, K1, K2> iter;
 		private Tuple3WrappingCollector<IN, K1, K2> coll;
 
-		private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
+		private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
 			super(wrapped);
 			this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
 			this.coll = new Tuple3WrappingCollector<IN, K1, K2>(this.iter);
@@ -72,7 +72,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
 		public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception {
 			iter.set(values.iterator());
 			coll.set(out);
-			this.wrappedFunction.combine(iter, coll);
+			((GroupCombineFunction)this.wrappedFunction).combine(iter, coll);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
index feb1169..15d98dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -114,6 +116,33 @@ public class FirstNITCase extends MultipleProgramsTestBase {
 				+ "(5,15)\n(5,14)\n(5,13)\n"
 				+ "(6,21)\n(6,20)\n(6,19)\n";
 	}
+
+	/**
+	 * Test for FLINK-2135
+	 */
+	@Test
+	public void testFaultyCast() throws Exception {
+		ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<String> b = ee.fromElements("a", "b");
+		GroupReduceOperator<String, String> a = b.groupBy(new KeySelector<String, Long>() {
+			@Override
+			public Long getKey(String value) throws Exception {
+				return 1L;
+			}
+		}).sortGroup(new KeySelector<String, Double>() {
+			@Override
+			public Double getKey(String value) throws Exception {
+				return 1.0;
+			}
+		}, Order.DESCENDING).first(1);
+
+		b.writeAsText(resultPath);
+		ee.execute();
+
+		expected = "a\nb";
+
+	}
 	
 	public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;