You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:01 UTC

[19/60] git commit: Perform TypeExtraction outside of Java Operators

Perform TypeExtraction outside of Java Operators

Before, FlatMapOperator, GroupReduceOperator, MapOperator, and MapPartitionOperator
performed the Type extraction themselves while the other Operators had TypeInformation
parameters. Now the are all unified, which makes it possible to use them from the
Scala API.

Also Key extraction for selector functions is moved outside of Keys.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6bbe2a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6bbe2a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6bbe2a05

Branch: refs/heads/master
Commit: 6bbe2a05ab4937a5a989aa2ed7451c1126d9faaa
Parents: 57b8e66
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Aug 22 18:17:48 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:57 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 23 ++++++++++++++------
 .../api/java/operators/CoGroupOperator.java     |  6 +++--
 .../api/java/operators/FlatMapOperator.java     |  6 ++---
 .../api/java/operators/GroupReduceOperator.java |  9 ++++----
 .../flink/api/java/operators/JoinOperator.java  |  6 +++--
 .../apache/flink/api/java/operators/Keys.java   | 11 +++++-----
 .../flink/api/java/operators/MapOperator.java   |  6 ++---
 .../java/operators/MapPartitionOperator.java    |  6 ++---
 .../api/java/operators/SortedGrouping.java      |  6 ++++-
 .../api/java/operators/UnsortedGrouping.java    |  6 ++++-
 10 files changed, 52 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index de86eee..f960abd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -66,6 +66,8 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 
 
 /**
@@ -145,7 +147,10 @@ public abstract class DataSet<T> {
 		if (FunctionUtils.isLambdaFunction(mapper)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
-		return new MapOperator<T, R>(this, mapper);
+
+		TypeInformation<R> resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType());
+
+		return new MapOperator<T, R>(this, resultType, mapper);
 	}
 
 
@@ -172,7 +177,8 @@ public abstract class DataSet<T> {
 		if (mapPartition == null) {
 			throw new NullPointerException("MapPartition function must not be null.");
 		}
-		return new MapPartitionOperator<T, R>(this, mapPartition);
+		TypeInformation<R> resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, this.getType());
+		return new MapPartitionOperator<T, R>(this, resultType, mapPartition);
 	}
 	
 	/**
@@ -194,7 +200,8 @@ public abstract class DataSet<T> {
 		if (FunctionUtils.isLambdaFunction(flatMapper)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
-		return new FlatMapOperator<T, R>(this, flatMapper);
+		TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, this.getType());
+		return new FlatMapOperator<T, R>(this, resultType, flatMapper);
 	}
 	
 	/**
@@ -340,7 +347,8 @@ public abstract class DataSet<T> {
 		if (FunctionUtils.isLambdaFunction(reducer)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
-		return new GroupReduceOperator<T, R>(this, reducer);
+		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getType());
+		return new GroupReduceOperator<T, R>(this, resultType, reducer);
 	}
 
 /**
@@ -400,7 +408,8 @@ public abstract class DataSet<T> {
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
-		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
+		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType));
 	}
 	
 	/**
@@ -456,9 +465,9 @@ public abstract class DataSet<T> {
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
 	 * @see DataSet
 	 */
-
 	public <K> UnsortedGrouping<T> groupBy(KeySelector<T, K> keyExtractor) {
-		return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
+		TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+		return new UnsortedGrouping<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType(), keyType));
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 899fc09..91ed28d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -367,7 +367,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 * @see DataSet
 		 */
 		public <K> CoGroupOperatorSetsPredicate where(KeySelector<I1, K> keyExtractor) {
-			return new CoGroupOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keyExtractor, input1.getType()));
+			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType());
+			return new CoGroupOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keyExtractor, input1.getType(), keyType));
 		}
 
 		// ----------------------------------------------------------------------------------------
@@ -431,7 +432,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			 *           Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)} to finalize the CoGroup transformation.
 			 */
 			public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) {
-				return createCoGroupOperator(new Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType()));
+				TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType());
+				return createCoGroupOperator(new Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType(), keyType));
 			}
 
 			/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
index 8e531d4..4d6d30a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/FlatMapOperator.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * This operator represents the application of a "flatMap" function on a data set, and the
@@ -38,8 +38,8 @@ public class FlatMapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, Fl
 	protected final FlatMapFunction<IN, OUT> function;
 	
 	
-	public FlatMapOperator(DataSet<IN> input, FlatMapFunction<IN, OUT> function) {
-		super(input, TypeExtractor.getFlatMapReturnTypes(function, input.getType()));
+	public FlatMapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, FlatMapFunction<IN, OUT> function) {
+		super(input, resultType);
 		
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index 7b0ad4d..9de8bd3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -32,7 +32,6 @@ import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.types.TypeInformation;
 
 import org.apache.flink.api.java.DataSet;
@@ -58,8 +57,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	 * @param input The input data set to the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
-	public GroupReduceOperator(DataSet<IN> input, GroupReduceFunction<IN, OUT> function) {
-		super(input, TypeExtractor.getGroupReduceReturnTypes(function, input.getType()));
+	public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+		super(input, resultType);
 		
 		this.function = function;
 		this.grouper = null;
@@ -73,8 +72,8 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	 * @param input The grouped input to be processed group-wise by the groupReduce function.
 	 * @param function The user-defined GroupReduce function.
 	 */
-	public GroupReduceOperator(Grouping<IN> input, GroupReduceFunction<IN, OUT> function) {
-		super(input != null ? input.getDataSet() : null, TypeExtractor.getGroupReduceReturnTypes(function, input.getDataSet().getType()));
+	public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function) {
+		super(input != null ? input.getDataSet() : null, resultType);
 		
 		this.function = function;
 		this.grouper = input;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 1ca2ec9..15447d8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -757,7 +757,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @see DataSet
 		 */
 		public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
-			return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector, input1.getType()));
+			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+			return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector, input1.getType(), keyType));
 		}
 		
 		// ----------------------------------------------------------------------------------------
@@ -829,7 +830,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			 * @return A DefaultJoin that represents the joined DataSet.
 			 */
 			public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
-				return createJoinOperator(new Keys.SelectorFunctionKeys<I2, K>(keySelector, input2.getType()));
+				TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+				return createJoinOperator(new Keys.SelectorFunctionKeys<I2, K>(keySelector, input2.getType(), keyType));
 			}
 			
 			protected DefaultJoin<I1, I2> createJoinOperator(Keys<I2> keys2) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 8019cc8..9ac2b2b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -126,19 +126,18 @@ public abstract class Keys<T> {
 
 		private final KeySelector<T, K> keyExtractor;
 		private final TypeInformation<K> keyType;
-		
-		public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> type) {
+
+		public SelectorFunctionKeys(KeySelector<T, K> keyExtractor, TypeInformation<T> inputType, TypeInformation<K> keyType) {
 			if (keyExtractor == null) {
 				throw new NullPointerException("Key extractor must not be null.");
 			}
-			
+
 			this.keyExtractor = keyExtractor;
-			this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
-			
+			this.keyType = keyType;
+
 			if (!this.keyType.isKeyType()) {
 				throw new IllegalArgumentException("Invalid type of KeySelector keys");
 			}
-			
 		}
 
 		public TypeInformation<K> getKeyType() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index eccdeec..b27201d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -22,9 +22,9 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * This operator represents the application of a "map" function on a data set, and the
@@ -40,9 +40,9 @@ public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOpe
 	protected final MapFunction<IN, OUT> function;
 	
 	
-	public MapOperator(DataSet<IN> input, MapFunction<IN, OUT> function) {
+	public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function) {
 
-		super(input, TypeExtractor.getMapReturnTypes(function, input.getType()));
+		super(input, resultType);
 		
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index caf55f9..5b999bf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.MapPartitionFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.types.TypeInformation;
 
 /**
  * This operator represents the application of a "mapPartition" function on a data set, and the
@@ -39,8 +39,8 @@ public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 	protected final MapPartitionFunction<IN, OUT> function;
 	
 	
-	public MapPartitionOperator(DataSet<IN> input, MapPartitionFunction<IN, OUT> function) {
-		super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType()));
+	public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function) {
+		super(input, resultType);
 		
 		this.function = function;
 		extractSemanticAnnotationsFromUdf(function.getClass());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 767f75a..1647055 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -27,6 +27,9 @@ import org.apache.flink.api.common.operators.Order;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 
 /**
  * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
@@ -85,7 +88,8 @@ public class SortedGrouping<T> extends Grouping<T> {
 		if (FunctionUtils.isLambdaFunction(reducer)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
-		return new GroupReduceOperator<T, R>(this, reducer);
+		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
+		return new GroupReduceOperator<T, R>(this, resultType, reducer);
 	}
 
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6bbe2a05/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 702f149..fdc86de 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -29,6 +29,8 @@ import org.apache.flink.api.java.functions.SelectByMaxFunction;
 import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 public class UnsortedGrouping<T> extends Grouping<T> {
 
@@ -133,7 +135,9 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		if (FunctionUtils.isLambdaFunction(reducer)) {
 			throw new UnsupportedLambdaExpressionException();
 		}
-		return new GroupReduceOperator<T, R>(this, reducer);
+		TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType());
+
+		return new GroupReduceOperator<T, R>(this, resultType, reducer);
 	}
 
 	/**