You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/04/03 21:33:13 UTC
[4/5] flink git commit: [FLINK-1776] Add offsets to field indexes of
semantic properties for operators with key selectors
[FLINK-1776] Add offsets to field indexes of semantic properties for operators with key selectors
This closes #532
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f39aec82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f39aec82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f39aec82
Branch: refs/heads/master
Commit: f39aec82d6cd1286f129b11366d101cb646716ee
Parents: dda8565
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Mar 24 15:18:21 2015 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Apr 3 20:41:40 2015 +0200
----------------------------------------------------------------------
.../api/java/functions/SemanticPropUtil.java | 79 ++
.../api/java/operators/CoGroupOperator.java | 25 +
.../java/operators/GroupCombineOperator.java | 23 +
.../api/java/operators/GroupReduceOperator.java | 22 +
.../flink/api/java/operators/JoinOperator.java | 23 +
.../api/java/operators/ReduceOperator.java | 22 +
.../translation/TwoKeyExtractingMapper.java | 3 +-
.../java/functions/SemanticPropUtilTest.java | 1008 ++++++++++--------
.../api/java/operator/CoGroupOperatorTest.java | 132 +++
.../java/operator/GroupCombineOperatorTest.java | 345 ++++++
.../java/operator/GroupReduceOperatorTest.java | 345 ++++++
.../api/java/operator/JoinOperatorTest.java | 183 +++-
.../api/java/operator/ReduceOperatorTest.java | 238 +++++
.../javaApiOperators/GroupReduceITCase.java | 58 +
14 files changed, 2054 insertions(+), 452 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 3343671..4569be3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -153,6 +153,85 @@ public class SemanticPropUtil {
return dsp;
}
+ /**
+ * Creates SemanticProperties by adding an offset to each input field index of the given SemanticProperties.
+ *
+ * @param props The SemanticProperties to which the offset is added.
+ * @param numInputFields The original number of fields of the input.
+ * @param offset The offset that is added to each input field index.
+ * @return New SemanticProperties with added offset.
+ */
+ public static SingleInputSemanticProperties addSourceFieldOffset(SingleInputSemanticProperties props,
+ int numInputFields, int offset) {
+
+ SingleInputSemanticProperties offsetProps = new SingleInputSemanticProperties();
+ if (props.getReadFields(0) != null) {
+ FieldSet offsetReadFields = new FieldSet();
+ for (int r : props.getReadFields(0)) {
+ offsetReadFields = offsetReadFields.addField(r + offset);
+ }
+ offsetProps.addReadFields(offsetReadFields);
+ }
+ for (int s = 0; s < numInputFields; s++) {
+ FieldSet targetFields = props.getForwardingTargetFields(0, s);
+ for (int t : targetFields) {
+ offsetProps.addForwardedField(s + offset, t);
+ }
+ }
+ return offsetProps;
+ }
+
+ /**
+ * Creates SemanticProperties by adding offsets to each input field index of the given SemanticProperties.
+ *
+ * @param props The SemanticProperties to which the offset is added.
+ * @param numInputFields1 The original number of fields of the first input.
+ * @param numInputFields2 The original number of fields of the second input.
+ * @param offset1 The offset that is added to each input field index of the first input.
+ * @param offset2 The offset that is added to each input field index of the second input.
+ * @return New SemanticProperties with added offsets.
+ */
+ public static DualInputSemanticProperties addSourceFieldOffsets(DualInputSemanticProperties props,
+ int numInputFields1, int numInputFields2,
+ int offset1, int offset2) {
+
+ DualInputSemanticProperties offsetProps = new DualInputSemanticProperties();
+
+ // add offset to read fields on first input
+ if(props.getReadFields(0) != null) {
+ FieldSet offsetReadFields = new FieldSet();
+ for(int r : props.getReadFields(0)) {
+ offsetReadFields = offsetReadFields.addField(r+offset1);
+ }
+ offsetProps.addReadFields(0, offsetReadFields);
+ }
+ // add offset to read fields on second input
+ if(props.getReadFields(1) != null) {
+ FieldSet offsetReadFields = new FieldSet();
+ for(int r : props.getReadFields(1)) {
+ offsetReadFields = offsetReadFields.addField(r+offset2);
+ }
+ offsetProps.addReadFields(1, offsetReadFields);
+ }
+
+ // add offset to forward fields on first input
+ for(int s = 0; s < numInputFields1; s++) {
+ FieldSet targetFields = props.getForwardingTargetFields(0, s);
+ for(int t : targetFields) {
+ offsetProps.addForwardedField(0, s + offset1, t);
+ }
+ }
+ // add offset to forward fields on second input
+ for(int s = 0; s < numInputFields2; s++) {
+ FieldSet targetFields = props.getForwardingTargetFields(1, s);
+ for(int t : targetFields) {
+ offsetProps.addForwardedField(1, s + offset2, t);
+ }
+ }
+
+ return offsetProps;
+ }
+
public static SingleInputSemanticProperties getSemanticPropsSingle(
Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
if (set == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index a051eb0..115a238 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
@@ -40,6 +41,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -129,6 +131,29 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
return function;
}
+ @Override
+ public DualInputSemanticProperties getSemanticProperties() {
+
+ DualInputSemanticProperties props = super.getSemanticProperties();
+
+ // offset semantic information by extracted key fields
+ if(props != null &&
+ (this.keys1 instanceof Keys.SelectorFunctionKeys ||
+ this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+
+ int numFields1 = this.getInput1Type().getTotalFields();
+ int numFields2 = this.getInput2Type().getTotalFields();
+ int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
+ ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+ int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
+ ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+
+ props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
+ }
+
+ return props;
+ }
+
protected Keys<I1> getKeys1() {
return this.keys1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 911c608..dc26fec 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
@@ -86,6 +88,27 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
return function;
}
+ @Override
+ public SingleInputSemanticProperties getSemanticProperties() {
+
+ SingleInputSemanticProperties props = super.getSemanticProperties();
+
+ // offset semantic information by extracted key fields
+ if(props != null &&
+ this.grouper != null &&
+ this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+ int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ if(this.grouper instanceof SortedGrouping) {
+ offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ }
+
+ props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+ }
+
+ return props;
+ }
+
// --------------------------------------------------------------------------------------------
// Translation
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index c96b7c6..bc4413f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -24,11 +24,13 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
@@ -120,6 +122,26 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
return this;
}
+ @Override
+ public SingleInputSemanticProperties getSemanticProperties() {
+
+ SingleInputSemanticProperties props = super.getSemanticProperties();
+
+ // offset semantic information by extracted key fields
+ if(props != null &&
+ this.grouper != null &&
+ this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+ int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ if(this.grouper instanceof SortedGrouping) {
+ offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ }
+ props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+ }
+
+ return props;
+ }
+
// --------------------------------------------------------------------------------------------
// Translation
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index e450ae1..4adf6b3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -225,6 +225,29 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
@Override
+ public DualInputSemanticProperties getSemanticProperties() {
+
+ DualInputSemanticProperties props = super.getSemanticProperties();
+
+ // offset semantic information by extracted key fields
+ if(props != null &&
+ (this.keys1 instanceof Keys.SelectorFunctionKeys ||
+ this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+
+ int numFields1 = this.getInput1Type().getTotalFields();
+ int numFields2 = this.getInput2Type().getTotalFields();
+ int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
+ ((Keys.SelectorFunctionKeys) this.keys1).getKeyType().getTotalFields() : 0;
+ int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
+ ((Keys.SelectorFunctionKeys) this.keys2).getKeyType().getTotalFields() : 0;
+
+ props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
+ }
+
+ return props;
+ }
+
+ @Override
protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
if (function instanceof DefaultJoin.WrappingFlatJoinFunction) {
return super.extractSemanticAnnotationsFromUdf(((WrappingFunction<?>) function).getWrappedFunction().getClass());
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 5951df8..e770278 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -21,10 +21,12 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
@@ -78,6 +80,26 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
}
@Override
+ public SingleInputSemanticProperties getSemanticProperties() {
+
+ SingleInputSemanticProperties props = super.getSemanticProperties();
+
+ // offset semantic information by extracted key fields
+ if(props != null &&
+ this.grouper != null &&
+ this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+
+ int offset = ((Keys.SelectorFunctionKeys) this.grouper.keys).getKeyType().getTotalFields();
+ if(this.grouper instanceof SortedGrouping) {
+ offset += ((SortedGrouping) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
+ }
+ props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
+ }
+
+ return props;
+ }
+
+ @Override
protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
String name = getName() != null ? getName() : "Reduce at "+defaultName;
http://git-wip-us.apache.org/repos/asf/flink/blob/f39aec82/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 48d2d1a..a98c899 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -18,11 +18,12 @@
package org.apache.flink.api.java.operators.translation;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
-
+@ForwardedFields("*->2")
public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> {
private static final long serialVersionUID = 1L;