You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/31 14:41:52 UTC
git commit: [FLINK-1039] Fix pojo expression keys for group reduce
Repository: incubator-flink
Updated Branches:
refs/heads/master 32d168f43 -> 23289d6ea
[FLINK-1039] Fix pojo expression keys for group reduce
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/23289d6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/23289d6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/23289d6e
Branch: refs/heads/master
Commit: 23289d6eadeb7ad8798f41e747a4962745e35769
Parents: 32d168f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 31 02:32:37 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 31 14:39:06 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/operators/Keys.java | 5 ++---
.../flink/api/java/operators/ReduceGroupOperator.java | 13 +++++++++++++
2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 131fba9..2fdb520 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -197,8 +197,8 @@ public abstract class Keys<T> {
public ExpressionKeys(String[] expressions, TypeInformation<T> type) {
if (!(type instanceof PojoTypeInfo<?>)) {
throw new UnsupportedOperationException("Key expressions can only be used on POJOs." + " " +
- "A POCO must have a default constructor without arguments and not have readObject" +
- " and/or writeObject methods. Also, it can only have nested POJOs or primitive (also boxed)" +
+ "A POJO must have a default constructor without arguments and not have readObject" +
+ " and/or writeObject methods. A current restriction is that it can only have nested POJOs or primitive (also boxed)" +
" fields.");
}
PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) type;
@@ -212,7 +212,6 @@ public abstract class Keys<T> {
" type " + type.toString() + ".");
}
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
index 099860c..d88d43d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
@@ -166,6 +166,19 @@ public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
return po;
}
+ else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
+
+ int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
+ UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+ GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
+ new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+
+ po.setCombinable(combinable);
+ po.setInput(input);
+ po.setDegreeOfParallelism(this.getParallelism());
+
+ return po;
+ }
else {
throw new UnsupportedOperationException("Unrecognized key type.");
}