You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/03 21:33:13 UTC

[4/5] flink git commit: [FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors

[FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors

This closes #532


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f39aec82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f39aec82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f39aec82

Branch: refs/heads/master
Commit: f39aec82d6cd1286f129b11366d101cb646716ee
Parents: dda8565
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Mar 24 15:18:21 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 3 20:41:40 2015 +0200

----------------------------------------------------------------------
 .../api/java/functions/SemanticPropUtil.java    |   79 ++
 .../api/java/operators/CoGroupOperator.java     |   25 +
 .../java/operators/GroupCombineOperator.java    |   23 +
 .../api/java/operators/GroupReduceOperator.java |   22 +
 .../flink/api/java/operators/JoinOperator.java  |   23 +
 .../api/java/operators/ReduceOperator.java      |   22 +
 .../translation/TwoKeyExtractingMapper.java     |    3 +-
 .../java/functions/SemanticPropUtilTest.java    | 1008 ++++++++++--------
 .../api/java/operator/CoGroupOperatorTest.java  |  132 +++
 .../java/operator/GroupCombineOperatorTest.java |  345 ++++++
 .../java/operator/GroupReduceOperatorTest.java  |  345 ++++++
 .../api/java/operator/JoinOperatorTest.java     |  183 +++-
 .../api/java/operator/ReduceOperatorTest.java   |  238 +++++
 .../javaApiOperators/GroupReduceITCase.java     |   58 +
 14 files changed, 2054 insertions(+), 452 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 3343671..4569be3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -153,6 +153,85 @@ public class SemanticPropUtil {
 		return dsp;
 	}
 
+	/**
+	 * Creates SemanticProperties by adding an offset to each input field index of the given SemanticProperties.
+	 *
+	 * @param props The SemanticProperties to which the offset is added.
+	 * @param numInputFields The original number of fields of the input.
+	 * @param offset The offset that is added to each input field index.
+	 * @return New SemanticProperties with added offset.
+	 */
+	public static SingleInputSemanticProperties addSourceFieldOffset(SingleInputSemanticProperties props,
+																		int numInputFields, int offset) {
+
+		SingleInputSemanticProperties offsetProps = new SingleInputSemanticProperties();
+		if (props.getReadFields(0) != null) {
+			FieldSet offsetReadFields = new FieldSet();
+			for (int r : props.getReadFields(0)) {
+				offsetReadFields = offsetReadFields.addField(r + offset);
+			}
+			offsetProps.addReadFields(offsetReadFields);
+		}
+		for (int s = 0; s < numInputFields; s++) {
+			FieldSet targetFields = props.getForwardingTargetFields(0, s);
+			for (int t : targetFields) {
+				offsetProps.addForwardedField(s + offset, t);
+			}
+		}
+		return offsetProps;
+	}
+
+	/**
+	 * Creates SemanticProperties by adding offsets to each input field index of the given SemanticProperties.
+	 *
+	 * @param props The SemanticProperties to which the offset is added.
+	 * @param numInputFields1 The original number of fields of the first input.
+	 * @param numInputFields2 The original number of fields of the second input.
+	 * @param offset1 The offset that is added to each input field index of the first input.
+	 * @param offset2 The offset that is added to each input field index of the second input.
+	 * @return New SemanticProperties with added offsets.
+	 */
+	public static DualInputSemanticProperties addSourceFieldOffsets(DualInputSemanticProperties props,
+																	int numInputFields1, int numInputFields2,
+																	int offset1, int offset2) {
+
+		DualInputSemanticProperties offsetProps = new DualInputSemanticProperties();
+
+		// add offset to read fields on first input
+		if(props.getReadFields(0) != null) {
+			FieldSet offsetReadFields = new FieldSet();
+			for(int r : props.getReadFields(0)) {
+				offsetReadFields = offsetReadFields.addField(r+offset1);
+			}
+			offsetProps.addReadFields(0, offsetReadFields);
+		}
+		// add offset to read fields on second input
+		if(props.getReadFields(1) != null) {
+			FieldSet offsetReadFields = new FieldSet();
+			for(int r : props.getReadFields(1)) {
+				offsetReadFields = offsetReadFields.addField(r+offset2);
+			}
+			offsetProps.addReadFields(1, offsetReadFields);
+		}
+
+		// add offset to forward fields on first input
+		for(int s = 0; s < numInputFields1; s++) {
+			FieldSet targetFields = props.getForwardingTargetFields(0, s);
+			for(int t : targetFields) {
+				offsetProps.addForwardedField(0, s + offset1, t);
+			}
+		}
+		// add offset to forward fields on second input
+		for(int s = 0; s < numInputFields2; s++) {
+			FieldSet targetFields = props.getForwardingTargetFields(1, s);
+			for(int t : targetFields) {
+				offsetProps.addForwardedField(1, s + offset2, t);
+			}
+		}
+
+		return offsetProps;
+	}
+
 	public static SingleInputSemanticProperties getSemanticPropsSingle(
 			Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
 		if (set == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index a051eb0..115a238 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -40,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -129,6 +131,29 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		return function;
 	}
 
+	@Override
+	public DualInputSemanticProperties getSemanticProperties() {
+
+		DualInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+					(this.keys1 instanceof Keys.SelectorFunctionKeys ||
+					this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+
+			int numFields1 = this.getInput1Type().getTotalFields();
+			int numFields2 = this.getInput2Type().getTotalFields();
+			int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
+					((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+			int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
+					((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+
+			props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
+		}
+
+		return props;
+	}
+
 	protected Keys<I1> getKeys1() {
 		return this.keys1;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 911c608..dc26fec 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
@@ -86,6 +88,27 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		return function;
 	}
 
+	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+
+		SingleInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+				this.grouper != null &&
+				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			if(this.grouper instanceof SortedGrouping) {
+				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+			}
+
+			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+		}
+
+		return props;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index c96b7c6..bc4413f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
@@ -120,6 +122,26 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		return this;
 	}
 
+	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+
+		SingleInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+				this.grouper != null &&
+				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			if(this.grouper instanceof SortedGrouping) {
+				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+			}
+			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+		}
+
+		return props;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index e450ae1..4adf6b3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -225,6 +225,29 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		@Override
+		public DualInputSemanticProperties getSemanticProperties() {
+
+			DualInputSemanticProperties props = super.getSemanticProperties();
+
+			// offset semantic information by extracted key fields
+			if(props != null &&
+					(this.keys1 instanceof Keys.SelectorFunctionKeys ||
+							this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+
+				int numFields1 = this.getInput1Type().getTotalFields();
+				int numFields2 = this.getInput2Type().getTotalFields();
+				int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
+						((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+				int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
+						((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+
+				props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
+			}
+
+			return props;
+		}
+
+		@Override
 		protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
 			if (function instanceof DefaultJoin.WrappingFlatJoinFunction) {
 				return super.extractSemanticAnnotationsFromUdf(((WrappingFunction<?>) function).getWrappedFunction().getClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 5951df8..e770278 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -21,10 +21,12 @@ package org.apache.flink.api.java.operators;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
@@ -78,6 +80,26 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	}
 
 	@Override
+	public SingleInputSemanticProperties getSemanticProperties() {
+
+		SingleInputSemanticProperties props = super.getSemanticProperties();
+
+		// offset semantic information by extracted key fields
+		if(props != null &&
+				this.grouper != null &&
+				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+			int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+			if(this.grouper instanceof SortedGrouping) {
+				offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+			}
+			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+		}
+
+		return props;
+	}
+
+	@Override
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
 		
 		String name = getName() != null ? getName() : "Reduce at "+defaultName;

http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 48d2d1a..a98c899 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.api.java.operators.translation;
 
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 
-
+@ForwardedFields("*->2")
 public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> {
 
 	private static final long serialVersionUID = 1L;