You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/06/04 13:16:50 UTC
flink git commit: [FLINK-2135] Fix faulty cast to GroupReduceFunction
Repository: flink
Updated Branches:
refs/heads/master 0dea359b3 -> 0081fb2ef
[FLINK-2135] Fix faulty cast to GroupReduceFunction
This closes #769
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0081fb2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0081fb2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0081fb2e
Branch: refs/heads/master
Commit: 0081fb2ef2bd03d06a786dd8988865d2ff6168c2
Parents: 0dea359
Author: Robert Metzger <rm...@apache.org>
Authored: Wed Jun 3 15:52:12 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 4 11:48:06 2015 +0200
----------------------------------------------------------------------
...PlanUnwrappingSortedReduceGroupOperator.java | 8 +++---
.../test/javaApiOperators/FirstNITCase.java | 29 ++++++++++++++++++++
2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 63ebfa4..e4d41f4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -37,7 +37,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
public PlanUnwrappingSortedReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey, boolean combinable)
{
- super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
+ super(combinable ? new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2>(udf),
new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType), groupingKey.computeLogicalKeyPositions(), name);
super.setCombinable(combinable);
@@ -46,7 +46,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
// --------------------------------------------------------------------------------------------
@RichGroupReduceFunction.Combinable
- public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<RichGroupReduceFunction<IN, OUT>>
+ public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
{
@@ -55,7 +55,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
private Tuple3UnwrappingIterator<IN, K1, K2> iter;
private Tuple3WrappingCollector<IN, K1, K2> coll;
- private TupleUnwrappingGroupCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
+ private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
this.coll = new Tuple3WrappingCollector<IN, K1, K2>(this.iter);
@@ -72,7 +72,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception {
iter.set(values.iterator());
coll.set(out);
- this.wrappedFunction.combine(iter, coll);
+ ((GroupCombineFunction)this.wrappedFunction).combine(iter, coll);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0081fb2e/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
index feb1169..15d98dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -114,6 +116,33 @@ public class FirstNITCase extends MultipleProgramsTestBase {
+ "(5,15)\n(5,14)\n(5,13)\n"
+ "(6,21)\n(6,20)\n(6,19)\n";
}
+
+ /**
+ * Test for FLINK-2135
+ */
+ @Test
+ public void testFaultyCast() throws Exception {
+ ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<String> b = ee.fromElements("a", "b");
+ GroupReduceOperator<String, String> a = b.groupBy(new KeySelector<String, Long>() {
+ @Override
+ public Long getKey(String value) throws Exception {
+ return 1L;
+ }
+ }).sortGroup(new KeySelector<String, Double>() {
+ @Override
+ public Double getKey(String value) throws Exception {
+ return 1.0;
+ }
+ }, Order.DESCENDING).first(1);
+
+ b.writeAsText(resultPath);
+ ee.execute();
+
+ expected = "a\nb";
+
+ }
public static class OneMapper implements MapFunction<Tuple3<Integer, Long, String>, Tuple1<Integer>> {
private static final long serialVersionUID = 1L;