You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/09 16:20:53 UTC
[4/6] flink git commit: [FLINK-2576] [javaAPI] [scalaAPI] [optimizer]
Add outerJoin to DataSet API (Java, Scala) and optimizer.
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index e76e3c9..7f15f9e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.java.operators;
-import java.security.InvalidParameterException;
import java.util.Arrays;
import com.google.common.base.Preconditions;
@@ -34,22 +33,25 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.SemanticPropUtil;
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.java.operators.join.JoinType;
+import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingJoinOperator;
-import org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingJoinOperator;
-import org.apache.flink.api.java.operators.translation.PlanRightUnwrappingJoinOperator;
+import org.apache.flink.api.java.operators.translation.TupleRightUnwrappingJoiner;
+import org.apache.flink.api.java.operators.translation.TupleLeftUnwrappingJoiner;
+import org.apache.flink.api.java.operators.translation.TupleUnwrappingJoiner;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -69,18 +71,19 @@ import org.apache.flink.api.java.tuple.*;
* @see DataSet
*/
public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT, JoinOperator<I1, I2, OUT>> {
-
+
protected final Keys<I1> keys1;
protected final Keys<I2> keys2;
private final JoinHint joinHint;
-
+ protected final JoinType joinType;
+
private Partitioner<?> customPartitioner;
- protected JoinOperator(DataSet<I1> input1, DataSet<I2> input2,
+ protected JoinOperator(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2,
- TypeInformation<OUT> returnType, JoinHint hint)
+ TypeInformation<OUT> returnType, JoinHint hint, JoinType type)
{
super(input1, input2, returnType);
@@ -116,7 +119,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
this.keys1 = keys1;
this.keys2 = keys2;
- this.joinHint = hint == null ? JoinHint.OPTIMIZER_CHOOSES : hint;
+ this.joinHint = hint == null ? InnerJoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES : hint;
+ this.joinType = type;
}
protected Keys<I1> getKeys1() {
@@ -135,6 +139,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
public JoinHint getJoinHint() {
return this.joinHint;
}
+
+ /**
+ * Gets the JoinType that describes this join operation (e.g. inner, outer)
+ *
+ * @return The JoinType
+ */
+ public JoinType getJoinType() {
+ return this.joinType;
+ }
/**
* Sets a custom partitioner for this join. The partitioner will be called on the join keys to determine
@@ -189,12 +202,23 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private boolean preserve2;
private final String joinLocationName;
-
+
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
- TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
- {
- super(input1, input2, keys1, keys2, returnType, hint);
+ TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName) {
+ this(input1, input2, keys1, keys2, function, returnType, hint, joinLocationName, JoinType.INNER);
+ }
+
+ public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
+ Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
+ TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName) {
+ this(input1, input2, keys1, keys2, generatedFunction, function, returnType, hint, joinLocationName, JoinType.INNER);
+ }
+
+ public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
+ Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
+ TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
+ super(input1, input2, keys1, keys2, returnType, hint, type);
if (function == null) {
throw new NullPointerException();
@@ -208,9 +232,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
- TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName)
- {
- super(input1, input2, keys1, keys2, returnType, hint);
+ TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
+ super(input1, input2, keys1, keys2, returnType, hint, type);
this.joinLocationName = joinLocationName;
@@ -282,232 +305,220 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
@Override
protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
+ String name = getName() != null ? getName() : "Join at " + joinLocationName;
+
+ JoinOperatorBaseBuilder<OUT> builder = new JoinOperatorBaseBuilder<OUT>(name, joinType)
+ .withParallelism(getParallelism())
+ .withPartitioner(getPartitioner())
+ .withJoinHint(getJoinHint())
+ .withResultType(getResultType());
+
+ final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys;
+ if (requiresTupleUnwrapping) {
+ if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) {
+ // Both join sides have a key selector function, so we need to do the
+ // tuple wrapping/unwrapping on both sides.
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+ builder = builder
+ .withUdf(new TupleUnwrappingJoiner<>(function))
+ .withWrappedInput1(input1, selectorKeys1, getInput1Type())
+ .withWrappedInput2(input2, selectorKeys2, getInput2Type());
+ } else if (keys2 instanceof Keys.SelectorFunctionKeys) {
+ // The right side of the join needs the tuple wrapping/unwrapping
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+
+ builder = builder
+ .withUdf(new TupleRightUnwrappingJoiner<>(function))
+ .withInput1(input1, getInput1Type(), keys1)
+ .withWrappedInput2(input2, selectorKeys2, getInput2Type());
+ } else {
+ // The left side of the join needs the tuple wrapping/unwrapping
- String name = getName() != null ? getName() : "Join at "+joinLocationName;
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
- final JoinOperatorBase<?, ?, OUT, ?> translated;
-
- if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) {
- // Both join sides have a key selector function, so we need to do the
- // tuple wrapping/unwrapping on both sides.
+ builder = builder
+ .withUdf(new TupleLeftUnwrappingJoiner<>(function))
+ .withWrappedInput1(input1, selectorKeys1, getInput1Type())
+ .withInput2(input2, getInput2Type(), keys2);
+ }
+ } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) {
+ // Neither side needs the tuple wrapping/unwrapping
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
-
- PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> po =
- translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function,
- getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
-
- // set parallelism
- po.setParallelism(this.getParallelism());
-
- translated = po;
+ builder = builder
+ .withUdf(function)
+ .withInput1(input1, getInput1Type(), keys1)
+ .withInput2(input2, getInput2Type(), keys2);
+ } else {
+ throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
}
- else if (keys2 instanceof Keys.SelectorFunctionKeys) {
- // The right side of the join needs the tuple wrapping/unwrapping
- int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions();
+ return builder.build();
+ }
+
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
- (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+ private static final class JoinOperatorBaseBuilder<OUT> {
+
+ private final String name;
+ private final JoinType joinType;
- PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> po =
- translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2,
- function, getInput1Type(), getInput2Type(), getResultType(), name,
- input1, input2);
+ private int parallelism;
+ private FlatJoinFunction<?, ?, OUT> udf;
+ private TypeInformation<OUT> resultType;
- // set parallelism
- po.setParallelism(this.getParallelism());
+ private Operator input1;
+ private TypeInformation<?> input1Type;
+ private Keys<?> keys1;
- translated = po;
+ private Operator input2;
+ private TypeInformation<?> input2Type;
+ private Keys<?> keys2;
+
+ private Partitioner<?> partitioner;
+ private JoinHint joinHint;
+
+ public JoinOperatorBaseBuilder(String name, JoinType joinType) {
+ this.name = name;
+ this.joinType = joinType;
}
- else if (keys1 instanceof Keys.SelectorFunctionKeys) {
- // The left side of the join needs the tuple wrapping/unwrapping
+ public <I1, K> JoinOperatorBaseBuilder<OUT> withWrappedInput1(
+ Operator<I1> input1,
+ Keys.SelectorFunctionKeys<I1, ?> rawKeys1,
+ TypeInformation<I1> inputType1) {
+ TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = new TupleTypeInfo<>(rawKeys1.getKeyType(), inputType1);
- @SuppressWarnings("unchecked")
- Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 =
- (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+ MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
+ createKeyMapper(rawKeys1, inputType1, input1, "Key Extractor 1");
- int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
+ return this.withInput1(keyMapper1, typeInfoWithKey1, rawKeys1);
+ }
- PlanLeftUnwrappingJoinOperator<I1, I2, OUT, ?> po =
- translateSelectorFunctionJoinLeft(selectorKeys1, logicalKeyPositions2, function,
- getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
+ public <I2, K> JoinOperatorBaseBuilder<OUT> withWrappedInput2(
+ Operator<I2> input2,
+ Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
+ TypeInformation<I2> inputType2) {
+ TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = new TupleTypeInfo<>(rawKeys2.getKeyType(), inputType2);
- // set parallelism
- po.setParallelism(this.getParallelism());
+ MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
+ createKeyMapper(rawKeys2, inputType2, input2, "Key Extractor 2");
- translated = po;
+ return withInput2(keyMapper2, typeInfoWithKey2, rawKeys2);
}
- else if (super.keys1 instanceof Keys.ExpressionKeys && super.keys2 instanceof Keys.ExpressionKeys)
- {
- // Neither side needs the tuple wrapping/unwrapping
- int[] logicalKeyPositions1 = super.keys1.computeLogicalKeyPositions();
- int[] logicalKeyPositions2 = super.keys2.computeLogicalKeyPositions();
-
- JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>> po =
- new JoinOperatorBase<I1, I2, OUT, FlatJoinFunction<I1, I2, OUT>>(function,
- new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
- logicalKeyPositions1, logicalKeyPositions2,
- name);
-
- // set inputs
- po.setFirstInput(input1);
- po.setSecondInput(input2);
- // set parallelism
- po.setParallelism(this.getParallelism());
-
- translated = po;
+ public <I1> JoinOperatorBaseBuilder<OUT> withInput1(
+ Operator<I1> input1,
+ TypeInformation<I1> input1Type,
+ Keys<?> keys1) {
+ this.input1 = input1;
+ this.input1Type = input1Type;
+ this.keys1 = keys1;
+ return this;
}
- else {
- throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
+
+ public <I2> JoinOperatorBaseBuilder<OUT> withInput2(
+ Operator<I2> input2,
+ TypeInformation<I2> input2Type,
+ Keys<?> keys2) {
+ this.input2 = input2;
+ this.input2Type = input2Type;
+ this.keys2 = keys2;
+ return this;
}
-
- translated.setJoinHint(getJoinHint());
- translated.setCustomPartitioner(getPartitioner());
-
- return translated;
- }
-
- private static <I1, I2, K, OUT> PlanBothUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoin(
- Keys.SelectorFunctionKeys<I1, ?> rawKeys1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
- FlatJoinFunction<I1, I2, OUT> function,
- TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
- Operator<I1> input1, Operator<I2> input2)
- {
- @SuppressWarnings("unchecked")
- final Keys.SelectorFunctionKeys<I1, K> keys1 = (Keys.SelectorFunctionKeys<I1, K>) rawKeys1;
- @SuppressWarnings("unchecked")
- final Keys.SelectorFunctionKeys<I2, K> keys2 = (Keys.SelectorFunctionKeys<I2, K>) rawKeys2;
-
- final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = new TupleTypeInfo<Tuple2<K, I1>>(keys1.getKeyType(), inputType1);
- final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = new TupleTypeInfo<Tuple2<K, I2>>(keys2.getKeyType(), inputType2);
-
- final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
- final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
-
- final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
- new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
- final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
- new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
- final PlanBothUnwrappingJoinOperator<I1, I2, OUT, K> join = new PlanBothUnwrappingJoinOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
-
- join.setFirstInput(keyMapper1);
- join.setSecondInput(keyMapper2);
-
- keyMapper1.setInput(input1);
- keyMapper2.setInput(input2);
- // set parallelism
- keyMapper1.setParallelism(input1.getParallelism());
- keyMapper2.setParallelism(input2.getParallelism());
-
- return join;
- }
-
- private static <I1, I2, K, OUT> PlanRightUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinRight(
- int[] logicalKeyPositions1,
- Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
- FlatJoinFunction<I1, I2, OUT> function,
- TypeInformation<I1> inputType1,
- TypeInformation<I2> inputType2,
- TypeInformation<OUT> outputType,
- String name,
- Operator<I1> input1,
- Operator<I2> input2) {
-
- if(!inputType1.isTupleType()) {
- throw new InvalidParameterException("Should not happen.");
+
+ public JoinOperatorBaseBuilder<OUT> withParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
}
-
- @SuppressWarnings("unchecked")
- final Keys.SelectorFunctionKeys<I2, K> keys2 =
- (Keys.SelectorFunctionKeys<I2, K>) rawKeys2;
-
- final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 =
- new TupleTypeInfo<Tuple2<K, I2>>(keys2.getKeyType(), inputType2);
-
- final KeyExtractingMapper<I2, K> extractor2 =
- new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
-
- final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
- new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(
- extractor2,
- new UnaryOperatorInformation<I2,Tuple2<K, I2>>(inputType2, typeInfoWithKey2),
- "Key Extractor 2");
-
- final PlanRightUnwrappingJoinOperator<I1, I2, OUT, K> join =
- new PlanRightUnwrappingJoinOperator<I1, I2, OUT, K>(
- function,
- logicalKeyPositions1,
- keys2,
- name,
- outputType,
- inputType1,
- typeInfoWithKey2);
-
- join.setFirstInput(input1);
- join.setSecondInput(keyMapper2);
-
- keyMapper2.setInput(input2);
- // set parallelism
- keyMapper2.setParallelism(input2.getParallelism());
-
- return join;
- }
-
- private static <I1, I2, K, OUT> PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K> translateSelectorFunctionJoinLeft(
- Keys.SelectorFunctionKeys<I1, ?> rawKeys1,
- int[] logicalKeyPositions2,
- FlatJoinFunction<I1, I2, OUT> function,
- TypeInformation<I1> inputType1,
- TypeInformation<I2> inputType2,
- TypeInformation<OUT> outputType,
- String name,
- Operator<I1> input1,
- Operator<I2> input2) {
-
- if(!inputType2.isTupleType()) {
- throw new InvalidParameterException("Should not happen.");
+
+ public JoinOperatorBaseBuilder<OUT> withPartitioner(Partitioner<?> partitioner) {
+ this.partitioner = partitioner;
+ return this;
}
-
+
+ public JoinOperatorBaseBuilder<OUT> withJoinHint(JoinHint joinHint) {
+ this.joinHint = joinHint;
+ return this;
+ }
+
+ public JoinOperatorBaseBuilder<OUT> withUdf(FlatJoinFunction<?, ?, OUT> udf) {
+ this.udf = udf;
+ return this;
+ }
+
+ public JoinOperatorBaseBuilder<OUT> withResultType(TypeInformation<OUT> resultType) {
+ this.resultType = resultType;
+ return this;
+ }
+
@SuppressWarnings("unchecked")
- final Keys.SelectorFunctionKeys<I1, K> keys1 = (Keys.SelectorFunctionKeys<I1, K>) rawKeys1;
-
- final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 =
- new TupleTypeInfo<Tuple2<K, I1>>(keys1.getKeyType(), inputType1);
-
- final KeyExtractingMapper<I1, K> extractor1 =
- new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
-
- final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
- new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(
- extractor1,
- new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1),
- "Key Extractor 1");
-
- final PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K> join =
- new PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K>(
- function,
- keys1,
- logicalKeyPositions2,
- name,
- outputType,
- typeInfoWithKey1,
- inputType2);
-
- join.setFirstInput(keyMapper1);
- join.setSecondInput(input2);
-
- keyMapper1.setInput(input1);
- // set parallelism
- keyMapper1.setParallelism(input1.getParallelism());
+ public JoinOperatorBase<?, ?, OUT, ?> build() {
+ JoinOperatorBase<?, ?, OUT, ?> operator;
+ if (joinType.isOuter()) {
+ operator = new OuterJoinOperatorBase<>(
+ udf,
+ new BinaryOperatorInformation(input1Type, input2Type, resultType),
+ this.keys1.computeLogicalKeyPositions(),
+ this.keys2.computeLogicalKeyPositions(),
+ this.name,
+ getOuterJoinType());
+ } else {
+ operator = new InnerJoinOperatorBase<>(
+ udf,
+ new BinaryOperatorInformation(input1Type, input2Type, resultType),
+ this.keys1.computeLogicalKeyPositions(),
+ this.keys2.computeLogicalKeyPositions(),
+ this.name);
+ }
+
+ operator.setFirstInput(input1);
+ operator.setSecondInput(input2);
+ operator.setParallelism(parallelism);
+ operator.setCustomPartitioner(partitioner);
+ operator.setJoinHint(joinHint);
+ return operator;
+ }
- return join;
+ private OuterJoinOperatorBase.OuterJoinType getOuterJoinType() {
+ switch (joinType) {
+ case LEFT_OUTER:
+ return OuterJoinOperatorBase.OuterJoinType.LEFT;
+ case RIGHT_OUTER:
+ return OuterJoinOperatorBase.OuterJoinType.RIGHT;
+ case FULL_OUTER:
+ return OuterJoinOperatorBase.OuterJoinType.FULL;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static <I, K> MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>> createKeyMapper(
+ Keys.SelectorFunctionKeys<I, ?> rawKeys,
+ TypeInformation<I> inputType,
+ Operator<I> input,
+ String mapperName) {
+
+ @SuppressWarnings("unchecked")
+ final Keys.SelectorFunctionKeys<I, K> keys = (Keys.SelectorFunctionKeys<I, K>) rawKeys;
+ final TypeInformation<Tuple2<K, I>> typeInfoWithKey = new TupleTypeInfo<>(keys.getKeyType(), inputType);
+ final KeyExtractingMapper<I, K> extractor = new KeyExtractingMapper<>(keys.getKeyExtractor());
+
+ final MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>> keyMapper =
+ new MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>>(
+ extractor,
+ new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
+ mapperName);
+ keyMapper.setInput(input);
+ keyMapper.setParallelism(input.getParallelism());
+ return keyMapper;
+ }
}
}
@@ -521,16 +532,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* @see Tuple2
* @see DataSet
*/
- public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> {
+ public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> implements JoinFunctionAssigner<I1, I2> {
- protected DefaultJoin(DataSet<I1> input1, DataSet<I2> input2,
- Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName)
+ public DefaultJoin(DataSet<I1> input1, DataSet<I2> input2,
+ Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName, JoinType type)
{
- super(input1, input2, keys1, keys2,
- (RichFlatJoinFunction<I1, I2, Tuple2<I1, I2>>) new DefaultFlatJoinFunction<I1, I2>(),
- new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName);
+ super(input1, input2, keys1, keys2,
+ new DefaultFlatJoinFunction<I1, I2>(),
+ new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName, type);
}
-
+
/**
* Finalizes a Join transformation by applying a {@link org.apache.flink.api.common.functions.RichFlatJoinFunction} to each pair of joined elements.<br/>
* Each JoinFunction call returns exactly one element.
@@ -547,16 +558,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
throw new NullPointerException("Join function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getFlatJoinReturnTypes(function, getInput1Type(), getInput2Type());
- return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), clean(function), returnType, getJoinHint(), Utils.getCallLocationName());
+ return new EquiJoin<>(getInput1(), getInput2(), getKeys1(), getKeys2(), clean(function), returnType, getJoinHint(), Utils.getCallLocationName(), joinType);
}
public <R> EquiJoin<I1, I2, R> with (JoinFunction<I1, I2, R> function) {
if (function == null) {
throw new NullPointerException("Join function must not be null.");
}
- FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<I1, I2, R>(clean(function));
+ FlatJoinFunction<I1, I2, R> generatedFunction = new WrappingFlatJoinFunction<>(clean(function));
TypeInformation<R> returnType = TypeExtractor.getJoinReturnTypes(function, getInput1Type(), getInput2Type());
- return new EquiJoin<I1, I2, R>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName());
+ return new EquiJoin<>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName(), joinType);
}
public static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
@@ -582,7 +593,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
*
- * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+ * <b>Note: With the current implementation, the Project transformation loses type information.</b>
*
* @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
@@ -595,7 +606,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
*/
public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
JoinProjection<I1, I2> joinProjection = new JoinProjection<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint(), firstFieldIndexes, null);
-
+
return joinProjection.projectTupleX();
}
@@ -608,7 +619,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
*
- * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+ * <b>Note: With the current implementation, the Project transformation loses type information.</b>
*
* @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
@@ -624,7 +635,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
return joinProjection.projectTupleX();
}
-
+
// public JoinOperator<I1, I2, I1> leftSemiJoin() {
// return new LeftSemiJoin<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint());
// }
@@ -659,22 +670,21 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private JoinProjection<I1, I2> joinProj;
protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
- super(input1, input2, keys1, keys2,
+ super(input1, input2, keys1, keys2,
new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
returnType, hint, Utils.getCallLocationName(4)); // We need to use the 4th element in the stack because the call comes through .types().
-
joinProj = null;
}
protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType, JoinProjection<I1, I2> joinProj) {
- super(input1, input2, keys1, keys2,
+ super(input1, input2, keys1, keys2,
new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
returnType, hint, Utils.getCallLocationName(4));
-
+
this.joinProj = joinProj;
}
-
+
@Override
protected ProjectFlatJoinFunction<I1, I2, OUT> getFunction() {
return (ProjectFlatJoinFunction<I1, I2, OUT>) super.getFunction();
@@ -689,7 +699,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
*
- * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+ * <b>Note: With the current implementation, the Project transformation loses type information.</b>
*
* @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
@@ -716,7 +726,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
*
- * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+ * <b>Note: With the current implementation, the Project transformation loses type information.</b>
*
* @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
@@ -834,188 +844,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
// }
// --------------------------------------------------------------------------------------------
- // Builder classes for incremental construction
- // --------------------------------------------------------------------------------------------
-
- /**
- * Intermediate step of a Join transformation. <br/>
- * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
- * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets#where(int...)} or
- * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets#where(KeySelector)}.
- *
- * @param <I1> The type of the first input DataSet of the Join transformation.
- * @param <I2> The type of the second input DataSet of the Join transformation.
- */
- public static final class JoinOperatorSets<I1, I2> {
-
- private final DataSet<I1> input1;
- private final DataSet<I2> input2;
-
- private final JoinHint joinHint;
-
- public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2) {
- this(input1, input2, JoinHint.OPTIMIZER_CHOOSES);
- }
-
- public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint) {
- if (input1 == null || input2 == null) {
- throw new NullPointerException();
- }
-
- this.input1 = input1;
- this.input2 = input2;
- this.joinHint = hint;
- }
-
- /**
- * Continues a Join transformation. <br/>
- * Defines the {@link Tuple} fields of the first join {@link DataSet} that should be used as join keys.<br/>
- * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
- *
- * @param fields The indexes of the other Tuple fields of the first join DataSets that should be used as keys.
- * @return An incomplete Join transformation.
- * Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
- * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
- * to continue the Join.
- *
- * @see Tuple
- * @see DataSet
- */
- public JoinOperatorSetsPredicate where(int... fields) {
- return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType()));
- }
-
- /**
- * Continues a Join transformation. <br/>
- * Defines the fields of the first join {@link DataSet} that should be used as grouping keys. Fields
- * are the names of member fields of the underlying type of the data set.
- *
- * @param fields The fields of the first join DataSets that should be used as keys.
- * @return An incomplete Join transformation.
- * Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
- * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
- * to continue the Join.
- *
- * @see Tuple
- * @see DataSet
- */
- public JoinOperatorSetsPredicate where(String... fields) {
- return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType()));
- }
-
- /**
- * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}.</br>
- * The KeySelector function is called for each element of the first DataSet and extracts a single
- * key value on which the DataSet is joined. </br>
- *
- * @param keySelector The KeySelector function which extracts the key values from the DataSet on which it is joined.
- * @return An incomplete Join transformation.
- * Call {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
- * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}
- * to continue the Join.
- *
- * @see KeySelector
- * @see DataSet
- */
- public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
- TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
- return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keySelector, input1.getType(), keyType));
- }
-
- // ----------------------------------------------------------------------------------------
-
- /**
- * Intermediate step of a Join transformation. <br/>
- * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
- * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
- * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}.
- *
- */
- public class JoinOperatorSetsPredicate {
-
- private final Keys<I1> keys1;
-
- private JoinOperatorSetsPredicate(Keys<I1> keys1) {
- if (keys1 == null) {
- throw new NullPointerException();
- }
-
- if (keys1.isEmpty()) {
- throw new InvalidProgramException("The join keys must not be empty.");
- }
-
- this.keys1 = keys1;
- }
-
- /**
- * Continues a Join transformation and defines the {@link Tuple} fields of the second join
- * {@link DataSet} that should be used as join keys.<br/>
- * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
- *
- * The resulting {@link org.apache.flink.api.java.operators.JoinOperator.DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
- * the element of the first input being the first field of the tuple and the element of the
- * second input being the second field of the tuple.
- *
- * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
- * @return A DefaultJoin that represents the joined DataSet.
- */
- public DefaultJoin<I1, I2> equalTo(int... fields) {
- return createJoinOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType()));
- }
-
- /**
- * Continues a Join transformation and defines the fields of the second join
- * {@link DataSet} that should be used as join keys.<br/>
- *
- * The resulting {@link org.apache.flink.api.java.operators.JoinOperator.DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
- * the element of the first input being the first field of the tuple and the element of the
- * second input being the second field of the tuple.
- *
- * @param fields The fields of the second join DataSet that should be used as keys.
- * @return A DefaultJoin that represents the joined DataSet.
- */
- public DefaultJoin<I1, I2> equalTo(String... fields) {
- return createJoinOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType()));
- }
-
- /**
- * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>
- * The KeySelector function is called for each element of the second DataSet and extracts a single
- * key value on which the DataSet is joined. </br>
- *
- * The resulting {@link org.apache.flink.api.java.operators.JoinOperator.DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
- * the element of the first input being the first field of the tuple and the element of the
- * second input being the second field of the tuple.
- *
- * @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined.
- * @return A DefaultJoin that represents the joined DataSet.
- */
- public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
- TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
- return createJoinOperator(new Keys.SelectorFunctionKeys<I2, K>(keySelector, input2.getType(), keyType));
- }
-
- protected DefaultJoin<I1, I2> createJoinOperator(Keys<I2> keys2) {
- if (keys2 == null) {
- throw new NullPointerException("The join keys may not be null.");
- }
-
- if (keys2.isEmpty()) {
- throw new InvalidProgramException("The join keys may not be empty.");
- }
-
- try {
- keys1.areCompatible(keys2);
- } catch (IncompatibleKeysException e) {
- throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e);
- }
-
- return new DefaultJoin<I1, I2>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4));
- }
- }
- }
-
- // --------------------------------------------------------------------------------------------
// default join functions
// --------------------------------------------------------------------------------------------
@@ -1033,7 +861,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
out.collect(outTuple);
}
}
-
+
public static final class ProjectFlatJoinFunction<T1, T2, R extends Tuple> extends RichFlatJoinFunction<T1, T2, R> {
private static final long serialVersionUID = 1L;
@@ -1052,10 +880,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
* @param outTupleInstance An instance of an output tuple.
*/
private ProjectFlatJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) {
-
if(fields.length != isFromFirst.length) {
throw new IllegalArgumentException("Fields and isFromFirst arrays must have same length!");
}
+
this.fields = fields;
this.isFromFirst = isFromFirst;
this.outTuple = outTupleInstance;
@@ -1070,16 +898,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
}
public void join(T1 in1, T2 in2, Collector<R> out) {
- for(int i=0; i<fields.length; i++) {
- if(isFromFirst[i]) {
- if(fields[i] >= 0 && in1 != null) {
- outTuple.setField(((Tuple)in1).getField(fields[i]), i);
+ for (int i = 0; i < fields.length; i++) {
+ if (isFromFirst[i]) {
+ if (fields[i] >= 0 && in1 != null) {
+ outTuple.setField(((Tuple) in1).getField(fields[i]), i);
} else {
outTuple.setField(in1, i);
}
} else {
- if(fields[i] >= 0 && in2 != null) {
- outTuple.setField(((Tuple)in2).getField(fields[i]), i);
+ if (fields[i] >= 0 && in2 != null) {
+ outTuple.setField(((Tuple) in2).getField(fields[i]), i);
} else {
outTuple.setField(in2, i);
}
@@ -1097,7 +925,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private final Keys<I1> keys1;
private final Keys<I2> keys2;
private final JoinHint hint;
-
+
private int[] fieldIndexes;
private boolean[] isFieldInFirst;
@@ -1105,13 +933,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
private final int numFieldsDs2;
public JoinProjection(DataSet<I1> ds1, DataSet<I2> ds2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] firstFieldIndexes, int[] secondFieldIndexes) {
-
this.ds1 = ds1;
this.ds2 = ds2;
this.keys1 = keys1;
this.keys2 = keys2;
this.hint = hint;
-
+
boolean isFirstTuple;
boolean isSecondTuple;
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java
new file mode 100644
index 0000000..e0e15ca
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/InnerJoinOperatorSets.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Intermediate step of a Join transformation. <br/>
+ * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
+ * {@link InnerJoinOperatorSets#where(int...)} or
+ * {@link InnerJoinOperatorSets#where(KeySelector)}.
+ *
+ * @param <I1> The type of the first input DataSet of the Join transformation.
+ * @param <I2> The type of the second input DataSet of the Join transformation.
+ */
+public final class InnerJoinOperatorSets<I1, I2> extends JoinOperatorSets<I1, I2> {
+
+ public InnerJoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2) {
+ super(input1, input2);
+ }
+
+ public InnerJoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint) {
+ super(input1, input2, hint);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return An incomplete Join transformation.
+ * Call {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}
+ * to continue the Join.
+ */
+ @Override
+ public InnerJoinOperatorSetsPredicate where(int... fields) {
+ return new InnerJoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return An incomplete Join transformation.
+ * Call {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}
+ * to continue the Join.
+ */
+ @Override
+ public InnerJoinOperatorSetsPredicate where(String... fields) {
+ return new InnerJoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return An incomplete Join transformation.
+ * Call {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}
+ * to continue the Join.
+ */
+ @Override
+ public <K> InnerJoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
+ TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+ return new InnerJoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<>(keySelector, input1.getType(), keyType));
+ }
+
+
+ /**
+ * Intermediate step of a Join transformation. <br/>
+ * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
+ * {@link InnerJoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link InnerJoinOperatorSetsPredicate#equalTo(KeySelector)}.
+ */
+ public class InnerJoinOperatorSetsPredicate extends JoinOperatorSetsPredicate {
+
+ private InnerJoinOperatorSetsPredicate(Keys<I1> keys1) {
+ super(keys1);
+ }
+
+ /**
+ * Continues a Join transformation and defines the {@link Tuple} fields of the second join
+ * {@link DataSet} that should be used as join keys.<br/>
+ * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
+ * <p/>
+ * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+ * the element of the first input being the first field of the tuple and the element of the
+ * second input being the second field of the tuple.
+ *
+ * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
+ * @return A DefaultJoin that represents the joined DataSet.
+ */
+ @Override
+ public DefaultJoin<I1, I2> equalTo(int... fields) {
+ return createDefaultJoin(new Keys.ExpressionKeys<>(fields, input2.getType()));
+ }
+
+ /**
+ * Continues a Join transformation and defines the fields of the second join
+ * {@link DataSet} that should be used as join keys.<br/>
+ * <p/>
+ * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+ * the element of the first input being the first field of the tuple and the element of the
+ * second input being the second field of the tuple.
+ *
+ * @param fields The fields of the second join DataSet that should be used as keys.
+ * @return A DefaultJoin that represents the joined DataSet.
+ */
+ @Override
+ public DefaultJoin<I1, I2> equalTo(String... fields) {
+ return createDefaultJoin(new Keys.ExpressionKeys<>(fields, input2.getType()));
+ }
+
+ /**
+ * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>
+ * The KeySelector function is called for each element of the second DataSet and extracts a single
+ * key value on which the DataSet is joined. </br>
+ * <p/>
+ * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+ * the element of the first input being the first field of the tuple and the element of the
+ * second input being the second field of the tuple.
+ *
+ * @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined.
+ * @return A DefaultJoin that represents the joined DataSet.
+ */
+ @Override
+ public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
+ TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+ return createDefaultJoin(new Keys.SelectorFunctionKeys<>(keySelector, input2.getType(), keyType));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java
new file mode 100644
index 0000000..163c5a6
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinFunctionAssigner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.operators.JoinOperator;
+
+/**
+ * A Join transformation that needs to be finished by specifying either a
+ * {@link JoinFunction} or a {@link FlatJoinFunction} before it can be used as an input
+ * to other operators.
+ *
+ * @param <I1> The type of the first input DataSet of the Join transformation.
+ * @param <I2> The type of the second input DataSet of the Join transformation.
+ */
+public interface JoinFunctionAssigner<I1, I2> {
+
+ <R> JoinOperator<I1, I2, R> with(JoinFunction<I1, I2, R> joinFunction);
+
+ <R> JoinOperator<I1, I2, R> with(FlatJoinFunction<I1, I2, R> joinFunction);
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java
new file mode 100644
index 0000000..705952c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSets.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
+import org.apache.flink.api.java.operators.JoinOperator.EquiJoin;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+/**
+ * Intermediate step of an Outer Join transformation. <br/>
+ * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
+ * {@link JoinOperatorSets#where(int...)} or
+ * {@link JoinOperatorSets#where(KeySelector)}.
+ *
+ * @param <I1> The type of the first input DataSet of the Join transformation.
+ * @param <I2> The type of the second input DataSet of the Join transformation.
+ */
+public class JoinOperatorSets<I1, I2> {
+
+ protected final DataSet<I1> input1;
+ protected final DataSet<I2> input2;
+
+ protected final JoinHint joinHint;
+ protected final JoinType joinType;
+
+ public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2) {
+ this(input1, input2, JoinHint.OPTIMIZER_CHOOSES);
+ }
+
+ public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint) {
+ this(input1, input2, hint, JoinType.INNER);
+ }
+
+ public JoinOperatorSets(DataSet<I1> input1, DataSet<I2> input2, JoinHint hint, JoinType type) {
+ if (input1 == null || input2 == null) {
+ throw new NullPointerException();
+ }
+
+ this.input1 = input1;
+ this.input2 = input2;
+ this.joinHint = hint;
+ this.joinType = type;
+ }
+
+ /**
+ * Continues a Join transformation. <br/>
+ * Defines the {@link Tuple} fields of the first join {@link DataSet} that should be used as join keys.<br/>
+ * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
+ *
+ * @param fields The indexes of the other Tuple fields of the first join DataSets that should be used as keys.
+ * @return An incomplete Join transformation.
+ * Call {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}
+ * to continue the Join.
+ *
+ * @see Tuple
+ * @see DataSet
+ */
+ public JoinOperatorSetsPredicate where(int... fields) {
+ return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+ }
+
+ /**
+ * Continues a Join transformation. <br/>
+ * Defines the fields of the first join {@link DataSet} that should be used as grouping keys. Fields
+ * are the names of member fields of the underlying type of the data set.
+ *
+ * @param fields The fields of the first join DataSets that should be used as keys.
+ * @return An incomplete Join transformation.
+ * Call {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}
+ * to continue the Join.
+ *
+ * @see Tuple
+ * @see DataSet
+ */
+ public JoinOperatorSetsPredicate where(String... fields) {
+ return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
+ }
+
+ /**
+ * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}.</br>
+ * The KeySelector function is called for each element of the first DataSet and extracts a single
+ * key value on which the DataSet is joined. </br>
+ *
+ * @param keySelector The KeySelector function which extracts the key values from the DataSet on which it is joined.
+ * @return An incomplete Join transformation.
+ * Call {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}
+ * to continue the Join.
+ *
+ * @see KeySelector
+ * @see DataSet
+ */
+ public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
+ TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
+ return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<>(keySelector, input1.getType(), keyType));
+ }
+
+
+ /**
+ * Intermediate step of a Join transformation. <br/>
+ * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
+ * {@link JoinOperatorSetsPredicate#equalTo(int...)} or
+ * {@link JoinOperatorSetsPredicate#equalTo(KeySelector)}.
+ *
+ */
+ public class JoinOperatorSetsPredicate {
+
+ protected final Keys<I1> keys1;
+
+ protected JoinOperatorSetsPredicate(Keys<I1> keys1) {
+ if (keys1 == null) {
+ throw new NullPointerException();
+ }
+
+ if (keys1.isEmpty()) {
+ throw new InvalidProgramException("The join keys must not be empty.");
+ }
+
+ this.keys1 = keys1;
+ }
+
+ /**
+ * Continues a Join transformation and defines the {@link Tuple} fields of the second join
+ * {@link DataSet} that should be used as join keys.<br/>
+ * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br/>
+ *
+ * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a
+ * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)}
+ *
+ * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys.
+ * @return A JoinFunctionAssigner.
+ */
+ public JoinFunctionAssigner<I1, I2> equalTo(int... fields) {
+ return createJoinFunctionAssigner(new Keys.ExpressionKeys<>(fields, input2.getType()));
+ }
+
+ /**
+ * Continues a Join transformation and defines the fields of the second join
+ * {@link DataSet} that should be used as join keys.<br/>
+ *
+ * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a
+ * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)}
+ *
+ * @param fields The fields of the second join DataSet that should be used as keys.
+ * @return A JoinFunctionAssigner.
+ */
+ public JoinFunctionAssigner<I1, I2> equalTo(String... fields) {
+ return createJoinFunctionAssigner(new Keys.ExpressionKeys<>(fields, input2.getType()));
+ }
+
+ /**
+ * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>
+ * The KeySelector function is called for each element of the second DataSet and extracts a single
+ * key value on which the DataSet is joined. </br>
+ *
+ * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a
+ * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)}
+ *
+ * @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined.
+ * @return A JoinFunctionAssigner.
+ */
+ public <K> JoinFunctionAssigner<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
+ TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
+ return createJoinFunctionAssigner(new Keys.SelectorFunctionKeys<>(keySelector, input2.getType(), keyType));
+ }
+
+ protected JoinFunctionAssigner<I1, I2> createJoinFunctionAssigner(Keys<I2> keys2) {
+ DefaultJoin<I1, I2> join = createDefaultJoin(keys2);
+ return new DefaultJoinFunctionAssigner(join);
+ }
+
+ protected DefaultJoin<I1, I2> createDefaultJoin(Keys<I2> keys2) {
+ if (keys2 == null) {
+ throw new NullPointerException("The join keys may not be null.");
+ }
+
+ if (keys2.isEmpty()) {
+ throw new InvalidProgramException("The join keys may not be empty.");
+ }
+
+ try {
+ keys1.areCompatible(keys2);
+ } catch (Keys.IncompatibleKeysException e) {
+ throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e);
+ }
+ return new DefaultJoin<>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4), joinType);
+ }
+
+ private class DefaultJoinFunctionAssigner implements JoinFunctionAssigner<I1, I2> {
+
+ private final DefaultJoin<I1, I2> defaultJoin;
+
+ public DefaultJoinFunctionAssigner(DefaultJoin<I1, I2> defaultJoin) {
+ this.defaultJoin = defaultJoin;
+ }
+
+ public <R> EquiJoin<I1, I2, R> with(JoinFunction<I1, I2, R> joinFunction) {
+ return defaultJoin.with(joinFunction);
+ }
+
+ public <R> EquiJoin<I1, I2, R> with(FlatJoinFunction<I1, I2, R> joinFunction) {
+ return defaultJoin.with(joinFunction);
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java
new file mode 100644
index 0000000..9d00fab
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.join;
+
+public enum JoinType {
+
+ INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER;
+
+ public boolean isOuter() {
+ return this == LEFT_OUTER || this == RIGHT_OUTER || this == FULL_OUTER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java
deleted file mode 100644
index e9ded4f..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingJoinOperator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.translation;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class PlanBothUnwrappingJoinOperator<I1, I2, OUT, K>
- extends JoinOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
-{
-
- public PlanBothUnwrappingJoinOperator(
- FlatJoinFunction<I1, I2, OUT> udf,
- Keys.SelectorFunctionKeys<I1, K> key1,
- Keys.SelectorFunctionKeys<I2, K> key2, String name,
- TypeInformation<OUT> resultType,
- TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
- TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {
-
- super(
- new TupleUnwrappingJoiner<I1, I2, OUT, K>(udf),
- new BinaryOperatorInformation<Tuple2<K, I1>, Tuple2<K, I2>, OUT>(
- typeInfoWithKey1,
- typeInfoWithKey2,
- resultType),
- key1.computeLogicalKeyPositions(), key2.computeLogicalKeyPositions(), name);
- }
-
- public static final class TupleUnwrappingJoiner<I1, I2, OUT, K>
- extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
- implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
- super(wrapped);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void join (Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
- wrappedFunction.join ((I1)(value1.getField(1)), (I2)(value2.getField(1)), collector);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
deleted file mode 100644
index c6ee804..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingJoinOperator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.translation;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class PlanLeftUnwrappingJoinOperator<I1, I2, OUT, K>
- extends JoinOperatorBase<Tuple2<K, I1>, I2, OUT, FlatJoinFunction<Tuple2<K, I1>, I2, OUT>> {
-
- public PlanLeftUnwrappingJoinOperator(
- FlatJoinFunction<I1, I2, OUT> udf,
- Keys.SelectorFunctionKeys<I1, K> key1,
- int[] key2, String name,
- TypeInformation<OUT> resultType,
- TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
- TypeInformation<I2> typeInfo2) {
- super(
- new TupleLeftUnwrappingJoiner<I1, I2, OUT, K>(udf),
- new BinaryOperatorInformation<Tuple2<K, I1>, I2, OUT>(
- typeInfoWithKey1,
- typeInfo2,
- resultType),
- key1.computeLogicalKeyPositions(), key2, name);
- }
-
- public static final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
- extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
- implements FlatJoinFunction<Tuple2<K, I1>, I2, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private TupleLeftUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
- super(wrapped);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void join (Tuple2<K, I1> value1, I2 value2, Collector<OUT> collector) throws Exception {
- wrappedFunction.join ((I1)(value1.getField(1)), value2, collector);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
deleted file mode 100644
index dc460f8..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingJoinOperator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.operators.translation;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-public class PlanRightUnwrappingJoinOperator<I1, I2, OUT, K>
- extends JoinOperatorBase<I1, Tuple2<K, I2>, OUT, FlatJoinFunction<I1, Tuple2<K, I2>, OUT>> {
-
- public PlanRightUnwrappingJoinOperator(
- FlatJoinFunction<I1, I2, OUT> udf,
- int[] key1,
- Keys.SelectorFunctionKeys<I2, K> key2,
- String name,
- TypeInformation<OUT> type,
- TypeInformation<I1> typeInfo1,
- TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {
-
- super(
- new TupleRightUnwrappingJoiner<I1, I2, OUT, K>(udf),
- new BinaryOperatorInformation<I1, Tuple2<K, I2>, OUT>(
- typeInfo1,
- typeInfoWithKey2,
- type),
- key1, key2.computeLogicalKeyPositions(), name);
- }
-
- public static final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
- extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
- implements FlatJoinFunction<I1, Tuple2<K, I2>, OUT> {
-
- private static final long serialVersionUID = 1L;
-
- private TupleRightUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
- super(wrapped);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void join (I1 value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
- wrappedFunction.join (value1, (I2)(value2.getField(1)), collector);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
new file mode 100644
index 0000000..18d3378
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
+ extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+ implements FlatJoinFunction<Tuple2<K, I1>, I2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ public TupleLeftUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
+ super(wrapped);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void join(Tuple2<K, I1> value1, I2 value2, Collector<OUT> collector) throws Exception {
+ wrappedFunction.join(value1 == null ? null : (I1) value1.getField(1), value2, collector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
new file mode 100644
index 0000000..004a0ae
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
+ extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+ implements FlatJoinFunction<I1, Tuple2<K, I2>, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ public TupleRightUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
+ super(wrapped);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void join(I1 value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
+ wrappedFunction.join(value1, value2 == null ? null : (I2) value2.getField(1), collector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
new file mode 100644
index 0000000..15cc137
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class TupleUnwrappingJoiner<I1, I2, OUT, K>
+ extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
+ implements FlatJoinFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ public TupleUnwrappingJoiner(FlatJoinFunction<I1, I2, OUT> wrapped) {
+ super(wrapped);
+ }
+
+ @Override
+ public void join(Tuple2<K, I1> value1, Tuple2<K, I2> value2, Collector<OUT> collector) throws Exception {
+ wrappedFunction.join(unwrap(value1), unwrap(value2), collector);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <V> V unwrap(Tuple2<K, V> t) {
+ return t == null ? null : (V) (t.getField(1));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
index 6dc150e..75744fe 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/JoinOperator.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
@@ -50,7 +50,7 @@ import com.google.common.base.Preconditions;
*/
@Deprecated
-public class JoinOperator extends JoinOperatorBase<Record, Record, Record, JoinFunction> implements RecordOperator {
+public class JoinOperator extends InnerJoinOperatorBase<Record, Record, Record, JoinFunction> implements RecordOperator {
/**
* The types of the keys that the operator operates on.
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
new file mode 100644
index 0000000..8bc29d4
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+@SuppressWarnings({ "unchecked", "serial" })
+public class InnerJoinOperatorBaseTest implements Serializable {
+
+
+ @Test
+ public void testTupleBaseJoiner(){
+ final FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>> joiner =
+ new FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>>()
+ {
+ @Override
+ public void join(Tuple3<String, Double, Integer> first, Tuple2<Integer, String> second, Collector<Tuple2<Double, String>> out) {
+
+ assertEquals(first.f0, second.f1);
+ assertEquals(first.f2, second.f0);
+
+ out.collect(new Tuple2<>(first.f1, second.f0.toString()));
+ }
+ };
+
+ final TupleTypeInfo<Tuple3<String, Double, Integer>> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo
+ (String.class, Double.class, Integer.class);
+ final TupleTypeInfo<Tuple2<Integer, String>> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
+ String.class);
+ final TupleTypeInfo<Tuple2<Double, String>> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class,
+ String.class);
+
+ final int[] leftKeys = new int[]{0,2};
+ final int[] rightKeys = new int[]{1,0};
+
+ final String taskName = "Collection based tuple joiner";
+
+ final BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double,
+ String>> binaryOpInfo = new BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer,
+ String>, Tuple2<Double, String>>(leftTypeInfo, rightTypeInfo, outTypeInfo);
+
+ final InnerJoinOperatorBase<Tuple3<String, Double, Integer>, Tuple2<Integer,
+ String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer,
+ String>, Tuple2<Double, String>>> base = new InnerJoinOperatorBase<Tuple3<String, Double, Integer>,
+ Tuple2<Integer, String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>,
+ Tuple2<Integer, String>, Tuple2<Double, String>>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName);
+
+ final List<Tuple3<String, Double, Integer> > inputData1 = new ArrayList<Tuple3<String, Double,
+ Integer>>(Arrays.asList(
+ new Tuple3<>("foo", 42.0, 1),
+ new Tuple3<>("bar", 1.0, 2),
+ new Tuple3<>("bar", 2.0, 3),
+ new Tuple3<>("foobar", 3.0, 4),
+ new Tuple3<>("bar", 3.0, 3)
+ ));
+
+ final List<Tuple2<Integer, String>> inputData2 = new ArrayList<Tuple2<Integer, String>>(Arrays.asList(
+ new Tuple2<>(3, "bar"),
+ new Tuple2<>(4, "foobar"),
+ new Tuple2<>(2, "foo")
+ ));
+ final Set<Tuple2<Double, String>> expected = new HashSet<Tuple2<Double, String>>(Arrays.asList(
+ new Tuple2<>(2.0, "3"),
+ new Tuple2<>(3.0, "3"),
+ new Tuple2<>(3.0, "4")
+ ));
+
+ try {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+ List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+ executionConfig.enableObjectReuse();
+ List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
+
+ assertEquals(expected, new HashSet<>(resultSafe));
+ assertEquals(expected, new HashSet<>(resultRegular));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}