You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/31 14:41:52 UTC

git commit: [FLINK-1039] Fix pojo expression keys for group reduce

Repository: incubator-flink
Updated Branches:
  refs/heads/master 32d168f43 -> 23289d6ea


[FLINK-1039] Fix pojo expression keys for group reduce


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/23289d6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/23289d6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/23289d6e

Branch: refs/heads/master
Commit: 23289d6eadeb7ad8798f41e747a4962745e35769
Parents: 32d168f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 31 02:32:37 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 31 14:39:06 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/operators/Keys.java |  5 ++---
 .../flink/api/java/operators/ReduceGroupOperator.java  | 13 +++++++++++++
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 131fba9..2fdb520 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -197,8 +197,8 @@ public abstract class Keys<T> {
 		public ExpressionKeys(String[] expressions, TypeInformation<T> type) {
 			if (!(type instanceof PojoTypeInfo<?>)) {
 				throw new UnsupportedOperationException("Key expressions can only be used on POJOs." + " " +
-						"A POCO must have a default constructor without arguments and not have readObject" +
-						" and/or writeObject methods. Also, it can only have nested POJOs or primitive (also boxed)" +
+						"A POJO must have a default constructor without arguments and not have readObject" +
+						" and/or writeObject methods. A current restriction is that it can only have nested POJOs or primitive (also boxed)" +
 						" fields.");
 			}
 			PojoTypeInfo<?> pojoType = (PojoTypeInfo<?>) type;
@@ -212,7 +212,6 @@ public abstract class Keys<T> {
 							" type " + type.toString() + ".");
 				}
 			}
-
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/23289d6e/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
index 099860c..d88d43d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceGroupOperator.java
@@ -166,6 +166,19 @@ public class ReduceGroupOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			
 			return po;
 		}
+		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
+
+			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
+			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+			GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>> po =
+					new GroupReduceOperatorBase<IN, OUT, GenericGroupReduce<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+
+			po.setCombinable(combinable);
+			po.setInput(input);
+			po.setDegreeOfParallelism(this.getParallelism());
+			
+			return po;
+		}
 		else {
 			throw new UnsupportedOperationException("Unrecognized key type.");
 		}