You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/09 16:20:54 UTC
[5/6] flink git commit: [FLINK-2576] [javaAPI] [scalaAPI] [optimizer]
Add outerJoin to DataSet API (Java, Scala) and optimizer.
[FLINK-2576] [javaAPI] [scalaAPI] [optimizer] Add outerJoin to DataSet API (Java, Scala) and optimizer.
This closes #1138
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b00c1d7e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b00c1d7e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b00c1d7e
Branch: refs/heads/master
Commit: b00c1d7e7b83a3f2e42223fe97dd369755f884c3
Parents: 6b22227
Author: Johann Kovacs <me...@jkovacs.de>
Authored: Tue Sep 8 18:23:54 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 9 16:19:21 2015 +0200
----------------------------------------------------------------------
.../api/common/io/ReplicatingInputFormat.java | 5 +-
.../operators/base/InnerJoinOperatorBase.java | 148 ++++
.../common/operators/base/JoinOperatorBase.java | 155 +----
.../operators/base/OuterJoinOperatorBase.java | 2 +-
.../base/InnerJoinOperatorBaseTest.java | 141 ++++
.../operators/base/JoinOperatorBaseTest.java | 141 ----
.../java/org/apache/flink/api/java/DataSet.java | 197 +++++-
.../flink/api/java/operators/JoinOperator.java | 693 +++++++------------
.../operators/join/InnerJoinOperatorSets.java | 154 +++++
.../operators/join/JoinFunctionAssigner.java | 39 ++
.../java/operators/join/JoinOperatorSets.java | 235 +++++++
.../flink/api/java/operators/join/JoinType.java | 28 +
.../PlanBothUnwrappingJoinOperator.java | 66 --
.../PlanLeftUnwrappingJoinOperator.java | 64 --
.../PlanRightUnwrappingJoinOperator.java | 66 --
.../translation/TupleLeftUnwrappingJoiner.java | 40 ++
.../translation/TupleRightUnwrappingJoiner.java | 40 ++
.../translation/TupleUnwrappingJoiner.java | 45 ++
.../api/java/record/operators/JoinOperator.java | 4 +-
.../base/InnerJoinOperatorBaseTest.java | 120 ++++
.../operators/base/JoinOperatorBaseTest.java | 122 ----
.../SemanticPropertiesProjectionTest.java | 6 +-
.../SemanticPropertiesTranslationTest.java | 16 +-
.../flink/api/java/operators/NamesTest.java | 4 +-
.../DeltaIterationTranslationTest.java | 6 +-
.../flink/optimizer/costs/CostEstimator.java | 3 +
.../apache/flink/optimizer/dag/JoinNode.java | 18 +-
.../apache/flink/optimizer/dag/MatchNode.java | 167 -----
.../flink/optimizer/dag/OuterJoinNode.java | 130 ++++
.../AbstractSortMergeJoinDescriptor.java | 81 +++
.../optimizer/operators/CoGroupDescriptor.java | 25 +-
.../operators/OperatorDescriptorDual.java | 24 +
.../SortMergeFullOuterJoinDescriptor.java | 39 ++
.../operators/SortMergeInnerJoinDescriptor.java | 44 ++
.../operators/SortMergeJoinDescriptor.java | 110 ---
.../SortMergeLeftOuterJoinDescriptor.java | 43 ++
.../SortMergeRightOuterJoinDescriptor.java | 43 ++
.../traversals/GraphCreatingVisitor.java | 15 +-
.../optimizer/FeedbackPropertiesMatchTest.java | 4 +-
.../SemanticPropertiesAPIToPlanTest.java | 4 +-
.../JoinGlobalPropertiesCompatibilityTest.java | 6 +-
.../org/apache/flink/api/scala/DataSet.scala | 117 +++-
.../apache/flink/api/scala/joinDataSet.scala | 139 +++-
.../apache/flink/test/util/TestEnvironment.java | 21 +-
.../test/javaApiOperators/OuterJoinITCase.java | 605 ++++++++++++++++
.../SemanticPropertiesTranslationTest.scala | 10 +-
.../api/scala/operators/OuterJoinITCase.scala | 214 ++++++
.../DeltaIterationTranslationTest.scala | 10 +-
48 files changed, 2946 insertions(+), 1463 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
index 3d0ea99..5c8eb27 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.common.io;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
@@ -33,7 +34,7 @@ import java.io.IOException;
* This is done by assigning all {@link org.apache.flink.core.io.InputSplit}s generated by the
* replicated InputFormat to each parallel instance.
*
- * Replicated data can only be used as input for a {@link org.apache.flink.api.common.operators.base.JoinOperatorBase} or
+ * Replicated data can only be used as input for a {@link InnerJoinOperatorBase} or
* {@link org.apache.flink.api.common.operators.base.CrossOperatorBase} with the same parallelism as the DataSource.
* Before being used as an input to a Join or Cross operator, replicated data might be processed in local pipelines by
* by Map-based operators with the same parallelism as the source. Map-based operators are
@@ -54,7 +55,7 @@ import java.io.IOException;
*
* @see org.apache.flink.api.common.io.InputFormat
* @see org.apache.flink.api.common.io.RichInputFormat
- * @see org.apache.flink.api.common.operators.base.JoinOperatorBase
+ * @see InnerJoinOperatorBase
* @see org.apache.flink.api.common.operators.base.CrossOperatorBase
* @see org.apache.flink.api.common.operators.base.MapOperatorBase
* @see org.apache.flink.api.common.operators.base.FlatMapOperatorBase
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java
new file mode 100644
index 0000000..8ec0c9e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @see org.apache.flink.api.common.functions.FlatJoinFunction
+ */
+public class InnerJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends JoinOperatorBase<IN1, IN2, OUT, FT> {
+
+ public InnerJoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name) {
+ super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+ }
+
+ public InnerJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1,
+ int[] keyPositions2, String name) {
+ super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+ }
+
+ public InnerJoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name) {
+ super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext,
+ ExecutionConfig executionConfig) throws Exception {
+ FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
+
+ FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
+ FunctionUtils.openFunction(function, this.parameters);
+
+ TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
+ TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
+ TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
+
+ TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
+ TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
+
+ TypeComparator<IN1> leftComparator;
+ TypeComparator<IN2> rightComparator;
+
+ if (leftInformation instanceof AtomicType) {
+ leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true, executionConfig);
+ } else if (leftInformation instanceof CompositeType) {
+ int[] keyPositions = getKeyColumns(0);
+ boolean[] orders = new boolean[keyPositions.length];
+ Arrays.fill(orders, true);
+
+ leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0, executionConfig);
+ } else {
+ throw new RuntimeException("Type information for left input of type " + leftInformation.getClass()
+ .getCanonicalName() + " is not supported. Could not generate a comparator.");
+ }
+
+ if (rightInformation instanceof AtomicType) {
+ rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true, executionConfig);
+ } else if (rightInformation instanceof CompositeType) {
+ int[] keyPositions = getKeyColumns(1);
+ boolean[] orders = new boolean[keyPositions.length];
+ Arrays.fill(orders, true);
+
+ rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0, executionConfig);
+ } else {
+ throw new RuntimeException("Type information for right input of type " + rightInformation.getClass()
+ .getCanonicalName() + " is not supported. Could not generate a comparator.");
+ }
+
+ TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
+
+ List<OUT> result = new ArrayList<OUT>();
+ Collector<OUT> collector = new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig));
+
+ Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
+
+ //Build hash table
+ for (IN2 element : inputData2) {
+ List<IN2> list = probeTable.get(rightComparator.hash(element));
+ if (list == null) {
+ list = new ArrayList<IN2>();
+ probeTable.put(rightComparator.hash(element), list);
+ }
+
+ list.add(element);
+ }
+
+ //Probing
+ for (IN1 left : inputData1) {
+ List<IN2> matchingHashes = probeTable.get(leftComparator.hash(left));
+
+ if (matchingHashes != null) {
+ pairComparator.setReference(left);
+ for (IN2 right : matchingHashes) {
+ if (pairComparator.equalToReference(right)) {
+ function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
+ }
+ }
+ }
+ }
+
+ FunctionUtils.closeFunction(function);
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 799496c..98194e1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -18,207 +18,96 @@
package org.apache.flink.api.common.operators.base;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.CopyingListCollector;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @see org.apache.flink.api.common.functions.FlatJoinFunction
- */
-public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
-
+
+public abstract class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends DualInputOperator<IN1, IN2, OUT, FT> {
+
/**
* An enumeration of hints, optionally usable to tell the system how exactly execute the join.
*/
public static enum JoinHint {
-
+
/**
* Leave the choice how to do the join to the optimizer. If in doubt, the
* optimizer will choose a repartitioning join.
*/
OPTIMIZER_CHOOSES,
-
+
/**
* Hint that the first join input is much smaller than the second. This results in
* broadcasting and hashing the first input, unless the optimizer infers that
* prior existing partitioning is available that is even cheaper to exploit.
*/
BROADCAST_HASH_FIRST,
-
+
/**
* Hint that the second join input is much smaller than the first. This results in
* broadcasting and hashing the second input, unless the optimizer infers that
* prior existing partitioning is available that is even cheaper to exploit.
*/
BROADCAST_HASH_SECOND,
-
+
/**
* Hint that the first join input is a bit smaller than the second. This results in
* repartitioning both inputs and hashing the first input, unless the optimizer infers that
* prior existing partitioning and orders are available that are even cheaper to exploit.
*/
REPARTITION_HASH_FIRST,
-
+
/**
- * Hint that the second join input is a bit smaller than the second. This results in
+ * Hint that the second join input is a bit smaller than the first. This results in
* repartitioning both inputs and hashing the second input, unless the optimizer infers that
* prior existing partitioning and orders are available that are even cheaper to exploit.
*/
REPARTITION_HASH_SECOND,
-
+
/**
* Hint that the join should repartitioning both inputs and use sorting and merging
* as the join strategy.
*/
REPARTITION_SORT_MERGE
- };
-
- // --------------------------------------------------------------------------------------------
-
-
+ }
+
private JoinHint joinHint = JoinHint.OPTIMIZER_CHOOSES;
-
private Partitioner<?> partitioner;
-
-
+
+
public JoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
super(udf, operatorInfo, keyPositions1, keyPositions2, name);
}
public JoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
- super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+ super(new UserCodeObjectWrapper<>(udf), operatorInfo, keyPositions1, keyPositions2, name);
}
-
+
public JoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo, int[] keyPositions1, int[] keyPositions2, String name) {
- super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+ super(new UserCodeClassWrapper<>(udf), operatorInfo, keyPositions1, keyPositions2, name);
}
-
-
+
+
public void setJoinHint(JoinHint joinHint) {
if (joinHint == null) {
throw new IllegalArgumentException("Join Hint must not be null.");
}
this.joinHint = joinHint;
}
-
+
public JoinHint getJoinHint() {
return joinHint;
}
-
+
public void setCustomPartitioner(Partitioner<?> partitioner) {
this.partitioner = partitioner;
}
-
+
public Partitioner<?> getCustomPartitioner() {
return partitioner;
}
-
- // --------------------------------------------------------------------------------------------
-
- @SuppressWarnings("unchecked")
- @Override
- protected List<OUT> executeOnCollections(List<IN1> inputData1, List<IN2> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception {
- FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
-
- FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
- FunctionUtils.openFunction(function, this.parameters);
-
- TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
- TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
- TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
- TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
- TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
-
- TypeComparator<IN1> leftComparator;
- TypeComparator<IN2> rightComparator;
-
- if (leftInformation instanceof AtomicType) {
- leftComparator = ((AtomicType<IN1>) leftInformation).createComparator(true, executionConfig);
- }
- else if (leftInformation instanceof CompositeType) {
- int[] keyPositions = getKeyColumns(0);
- boolean[] orders = new boolean[keyPositions.length];
- Arrays.fill(orders, true);
-
- leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0, executionConfig);
- }
- else {
- throw new RuntimeException("Type information for left input of type " + leftInformation.getClass()
- .getCanonicalName() + " is not supported. Could not generate a comparator.");
- }
-
- if (rightInformation instanceof AtomicType) {
- rightComparator = ((AtomicType<IN2>) rightInformation).createComparator(true, executionConfig);
- }
- else if (rightInformation instanceof CompositeType) {
- int[] keyPositions = getKeyColumns(1);
- boolean[] orders = new boolean[keyPositions.length];
- Arrays.fill(orders, true);
-
- rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0, executionConfig);
- }
- else {
- throw new RuntimeException("Type information for right input of type " + rightInformation.getClass()
- .getCanonicalName() + " is not supported. Could not generate a comparator.");
- }
-
- TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
-
- List<OUT> result = new ArrayList<OUT>();
- Collector<OUT> collector = new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig));
-
- Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
-
- //Build hash table
- for (IN2 element: inputData2){
- List<IN2> list = probeTable.get(rightComparator.hash(element));
- if(list == null){
- list = new ArrayList<IN2>();
- probeTable.put(rightComparator.hash(element), list);
- }
-
- list.add(element);
- }
-
- //Probing
- for (IN1 left: inputData1) {
- List<IN2> matchingHashes = probeTable.get(leftComparator.hash(left));
-
- if (matchingHashes != null) {
- pairComparator.setReference(left);
- for (IN2 right : matchingHashes) {
- if (pairComparator.equalToReference(right)) {
- function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
- }
- }
- }
- }
-
- FunctionUtils.closeFunction(function);
-
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
index 7666d10..02c8981 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -45,7 +45,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, OUT, FT> {
+public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends JoinOperatorBase<IN1, IN2, OUT, FT> {
public static enum OuterJoinType {LEFT, RIGHT, FULL}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
new file mode 100644
index 0000000..04505d0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@SuppressWarnings("serial")
+public class InnerJoinOperatorBaseTest implements Serializable {
+
+ @Test
+ public void testJoinPlain(){
+ final FlatJoinFunction<String, String, Integer> joiner = new FlatJoinFunction<String, String, Integer>() {
+
+ @Override
+ public void join(String first, String second, Collector<Integer> out) throws Exception {
+ out.collect(first.length());
+ out.collect(second.length());
+ }
+ };
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ InnerJoinOperatorBase<String, String, Integer,
+ FlatJoinFunction<String, String,Integer> > base = new InnerJoinOperatorBase(joiner,
+ new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], "TestJoiner");
+
+ List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
+ List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
+ List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6 ,6));
+
+ try {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+ List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
+ executionConfig.enableObjectReuse();
+ List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
+
+ assertEquals(expected, resultSafe);
+ assertEquals(expected, resultRegular);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJoinRich(){
+ final AtomicBoolean opened = new AtomicBoolean(false);
+ final AtomicBoolean closed = new AtomicBoolean(false);
+ final String taskName = "Test rich join function";
+
+ final RichFlatJoinFunction<String, String, Integer> joiner = new RichFlatJoinFunction<String, String, Integer>() {
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ opened.compareAndSet(false, true);
+ assertEquals(0, getRuntimeContext().getIndexOfThisSubtask());
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception{
+ closed.compareAndSet(false, true);
+ }
+
+ @Override
+ public void join(String first, String second, Collector<Integer> out) throws Exception {
+ out.collect(first.length());
+ out.collect(second.length());
+ }
+ };
+
+ InnerJoinOperatorBase<String, String, Integer,
+ RichFlatJoinFunction<String, String, Integer>> base = new InnerJoinOperatorBase<String, String, Integer,
+ RichFlatJoinFunction<String, String, Integer>>(joiner, new BinaryOperatorInformation<String, String,
+ Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], taskName);
+
+ final List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
+ final List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
+ final List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6, 6));
+
+
+ try {
+ final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
+ final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+ List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+ executionConfig.enableObjectReuse();
+ List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
+
+ assertEquals(expected, resultSafe);
+ assertEquals(expected, resultRegular);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ assertTrue(opened.get());
+ assertTrue(closed.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
deleted file mode 100644
index 6d4ff33..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.operators.base;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-@SuppressWarnings("serial")
-public class JoinOperatorBaseTest implements Serializable {
-
- @Test
- public void testJoinPlain(){
- final FlatJoinFunction<String, String, Integer> joiner = new FlatJoinFunction<String, String, Integer>() {
-
- @Override
- public void join(String first, String second, Collector<Integer> out) throws Exception {
- out.collect(first.length());
- out.collect(second.length());
- }
- };
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- JoinOperatorBase<String, String, Integer,
- FlatJoinFunction<String, String,Integer> > base = new JoinOperatorBase(joiner,
- new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], "TestJoiner");
-
- List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
- List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
- List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6 ,6));
-
- try {
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.disableObjectReuse();
- List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
- executionConfig.enableObjectReuse();
- List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, null, executionConfig);
-
- assertEquals(expected, resultSafe);
- assertEquals(expected, resultRegular);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testJoinRich(){
- final AtomicBoolean opened = new AtomicBoolean(false);
- final AtomicBoolean closed = new AtomicBoolean(false);
- final String taskName = "Test rich join function";
-
- final RichFlatJoinFunction<String, String, Integer> joiner = new RichFlatJoinFunction<String, String, Integer>() {
- @Override
- public void open(Configuration parameters) throws Exception {
- opened.compareAndSet(false, true);
- assertEquals(0, getRuntimeContext().getIndexOfThisSubtask());
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void close() throws Exception{
- closed.compareAndSet(false, true);
- }
-
- @Override
- public void join(String first, String second, Collector<Integer> out) throws Exception {
- out.collect(first.length());
- out.collect(second.length());
- }
- };
-
- JoinOperatorBase<String, String, Integer,
- RichFlatJoinFunction<String, String, Integer>> base = new JoinOperatorBase<String, String, Integer,
- RichFlatJoinFunction<String, String, Integer>>(joiner, new BinaryOperatorInformation<String, String,
- Integer>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO), new int[0], new int[0], taskName);
-
- final List<String> inputData1 = new ArrayList<String>(Arrays.asList("foo", "bar", "foobar"));
- final List<String> inputData2 = new ArrayList<String>(Arrays.asList("foobar", "foo"));
- final List<Integer> expected = new ArrayList<Integer>(Arrays.asList(3, 3, 6, 6));
-
-
- try {
- final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
- final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.disableObjectReuse();
- List<Integer> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
- executionConfig.enableObjectReuse();
- List<Integer> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig);
-
- assertEquals(expected, resultSafe);
- assertEquals(expected, resultRegular);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- assertTrue(opened.get());
- assertTrue(closed.get());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 98a94c6..6c8df21 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -63,7 +63,7 @@ import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets;
+import org.apache.flink.api.java.operators.join.InnerJoinOperatorSets;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
@@ -75,6 +75,8 @@ import org.apache.flink.api.java.operators.SortPartitionOperator;
import org.apache.flink.api.java.operators.SortedGrouping;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.operators.join.JoinOperatorSets;
+import org.apache.flink.api.java.operators.join.JoinType;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
@@ -290,8 +292,8 @@ public abstract class DataSet<T> {
/**
* Applies a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
- * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
- * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
+ * <b>Note: Only Tuple DataSets can be projected using field indexes.</b><br/>
+ * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.<br/>
* Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
@@ -638,7 +640,7 @@ public abstract class DataSet<T> {
/**
* Groups a {@link DataSet} using a {@link KeySelector} function.
* The KeySelector function is called for each element of the DataSet and extracts a single
- * key value on which the DataSet is grouped. </br>
+ * key value on which the DataSet is grouped. <br/>
* This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation
* can be applied.
* <ul>
@@ -665,7 +667,7 @@ public abstract class DataSet<T> {
/**
* Groups a {@link Tuple} {@link DataSet} using field position keys.<br/>
- * <b>Note: Field position keys only be specified for Tuple DataSets.</b></br>
+ * <b>Note: Field position keys only be specified for Tuple DataSets.</b><br/>
* The field position keys specify the fields of Tuples on which the DataSet is grouped.
* This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation
* can be applied.
@@ -725,83 +727,204 @@ public abstract class DataSet<T> {
* Initiates a Join transformation. <br/>
* A Join transformation joins the elements of two
* {@link DataSet DataSets} on key equality and provides multiple ways to combine
- * joining elements into one DataSet.</br>
+ * joining elements into one DataSet.<br/>
*
- * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+ * This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
* can be called to define the join key of the first joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
- * @return A JoinOperatorSets to continue the definition of the Join transformation.
+ * @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
*
- * @see JoinOperatorSets
+ * @see InnerJoinOperatorSets
* @see DataSet
*/
- public <R> JoinOperatorSets<T, R> join(DataSet<R> other) {
- return new JoinOperatorSets<T, R>(this, other);
+ public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other) {
+ return new InnerJoinOperatorSets<T, R>(this, other);
}
/**
* Initiates a Join transformation. <br/>
* A Join transformation joins the elements of two
* {@link DataSet DataSets} on key equality and provides multiple ways to combine
- * joining elements into one DataSet.</br>
+ * joining elements into one DataSet.<br/>
*
- * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+ * This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
* can be called to define the join key of the first joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
- * @param strategy The strategy that should be used execute the join. If {@code null} is give, then the
+ * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
* optimizer will pick the join strategy.
- * @return A JoinOperatorSets to continue the definition of the Join transformation.
+ * @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
*
- * @see JoinOperatorSets
+ * @see InnerJoinOperatorSets
* @see DataSet
*/
- public <R> JoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy) {
- return new JoinOperatorSets<T, R>(this, other, strategy);
+ public <R> InnerJoinOperatorSets<T, R> join(DataSet<R> other, JoinHint strategy) {
+ return new InnerJoinOperatorSets<T, R>(this, other, strategy);
}
/**
* Initiates a Join transformation. <br/>
* A Join transformation joins the elements of two
* {@link DataSet DataSets} on key equality and provides multiple ways to combine
- * joining elements into one DataSet.</br>
+ * joining elements into one DataSet.<br/>
* This method also gives the hint to the optimizer that the second DataSet to join is much
- * smaller than the first one.</br>
- * This method returns a {@link JoinOperatorSets} on which
- * {@link JoinOperatorSets#where(String...)} needs to be called to define the join key of the first
+ * smaller than the first one.<br/>
+ * This method returns a {@link InnerJoinOperatorSets} on which
+ * {@link InnerJoinOperatorSets#where(String...)} needs to be called to define the join key of the first
* joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
- * @return A JoinOperatorSets to continue the definition of the Join transformation.
+ * @return A InnerJoinOperatorSets to continue the definition of the Join transformation.
*
- * @see JoinOperatorSets
+ * @see InnerJoinOperatorSets
* @see DataSet
*/
- public <R> JoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
- return new JoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_SECOND);
+ public <R> InnerJoinOperatorSets<T, R> joinWithTiny(DataSet<R> other) {
+ return new InnerJoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_SECOND);
}
/**
* Initiates a Join transformation.<br/>
* A Join transformation joins the elements of two
* {@link DataSet DataSets} on key equality and provides multiple ways to combine
- * joining elements into one DataSet.</br>
+ * joining elements into one DataSet.<br/>
* This method also gives the hint to the optimizer that the second DataSet to join is much
- * larger than the first one.</br>
- * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods
+ * larger than the first one.<br/>
+ * This method returns a {@link InnerJoinOperatorSets} on which one of the {@code where} methods
* can be called to define the join key of the first joining (i.e., this) DataSet.
*
* @param other The other DataSet with which this DataSet is joined.
* @return A JoinOperatorSet to continue the definition of the Join transformation.
*
+ * @see InnerJoinOperatorSets
+ * @see DataSet
+ */
+ public <R> InnerJoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
+ return new InnerJoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_FIRST);
+ }
+
+ /**
+ * Initiates a Left Outer Join transformation.<br/>
+ * An Outer Join transformation joins two elements of two
+ * {@link DataSet DataSets} on key equality and provides multiple ways to combine
+ * joining elements into one DataSet.<br/>
+ * Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
+ * element on the other side are joined with {@code null} and emitted to the
+ * resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @return A JoinOperatorSet to continue the definition of the Join transformation.
+ *
* @see JoinOperatorSets
* @see DataSet
*/
- public <R> JoinOperatorSets<T, R> joinWithHuge(DataSet<R> other) {
- return new JoinOperatorSets<T, R>(this, other, JoinHint.BROADCAST_HASH_FIRST);
+ public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other) {
+ return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.LEFT_OUTER);
}
-
+
+ /**
+ * Initiates a Left Outer Join transformation.<br/>
+ * An Outer Join transformation joins two elements of two
+ * {@link DataSet DataSets} on key equality and provides multiple ways to combine
+ * joining elements into one DataSet.<br/>
+ * Elements of the <b>left</b> DataSet (i.e. {@code this}) that do not have a matching
+ * element on the other side are joined with {@code null} and emitted to the
+ * resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
+ * optimizer will pick the join strategy.
+ * @return A JoinOperatorSet to continue the definition of the Join transformation.
+ *
+ * @see JoinOperatorSets
+ * @see DataSet
+ */
+ public <R> JoinOperatorSets<T, R> leftOuterJoin(DataSet<R> other, JoinHint strategy) {
+ return new JoinOperatorSets<>(this, other, strategy, JoinType.LEFT_OUTER);
+ }
+
+ /**
+ * Initiates a Right Outer Join transformation.<br/>
+ * An Outer Join transformation joins two elements of two
+ * {@link DataSet DataSets} on key equality and provides multiple ways to combine
+ * joining elements into one DataSet.<br/>
+ * Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
+ * element on {@code this} side are joined with {@code null} and emitted to the
+ * resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @return A JoinOperatorSet to continue the definition of the Join transformation.
+ *
+ * @see JoinOperatorSets
+ * @see DataSet
+ */
+ public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other) {
+ return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.RIGHT_OUTER);
+ }
+
+ /**
+ * Initiates a Right Outer Join transformation.<br/>
+ * An Outer Join transformation joins two elements of two
+ * {@link DataSet DataSets} on key equality and provides multiple ways to combine
+ * joining elements into one DataSet.<br/>
+ * Elements of the <b>right</b> DataSet (i.e. {@code other}) that do not have a matching
+ * element on {@code this} side are joined with {@code null} and emitted to the
+ * resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
+ * optimizer will pick the join strategy.
+ * @return A JoinOperatorSet to continue the definition of the Join transformation.
+ *
+ * @see JoinOperatorSets
+ * @see DataSet
+ */
+ public <R> JoinOperatorSets<T, R> rightOuterJoin(DataSet<R> other, JoinHint strategy) {
+ return new JoinOperatorSets<>(this, other, strategy, JoinType.RIGHT_OUTER);
+ }
+
+ /**
+ * Initiates a Full Outer Join transformation.<br/>
+ * An Outer Join transformation joins two elements of two
+ * {@link DataSet DataSets} on key equality and provides multiple ways to combine
+ * joining elements into one DataSet.<br/>
+ * Elements of <b>both</b> DataSets that do not have a matching
+ * element on the opposing side are joined with {@code null} and emitted to the
+ * resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @return A JoinOperatorSet to continue the definition of the Join transformation.
+ *
+ * @see JoinOperatorSets
+ * @see DataSet
+ */
+ public <R> JoinOperatorSets<T, R> fullOuterJoin(DataSet<R> other) {
+ return new JoinOperatorSets<>(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.FULL_OUTER);
+ }
+
+ /**
+ * Initiates a Full Outer Join transformation.<br/>
+ * An Outer Join transformation joins two elements of two
+ * {@link DataSet DataSets} on key equality and provides multiple ways to combine
+ * joining elements into one DataSet.<br/>
+ * Elements of <b>both</b> DataSets that do not have a matching
+ * element on the opposing side are joined with {@code null} and emitted to the
+ * resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @param strategy The strategy that should be used execute the join. If {@code null} is given, then the
+ * optimizer will pick the join strategy.
+ * @return A JoinOperatorSet to continue the definition of the Join transformation.
+ *
+ * @see JoinOperatorSets
+ * @see DataSet
+ */
+ public <R> JoinOperatorSets<T, R> fullOuterJoin(DataSet<R> other, JoinHint strategy) {
+ return new JoinOperatorSets<>(this, other, strategy, JoinType.FULL_OUTER);
+ }
+
+
// --------------------------------------------------------------------------------------------
// Co-Grouping
// --------------------------------------------------------------------------------------------
@@ -812,9 +935,9 @@ public abstract class DataSet<T> {
* two {@link DataSet DataSets} into one DataSet. It groups each DataSet individually on a key and
* gives groups of both DataSets with equal keys together into a {@link org.apache.flink.api.common.functions.RichCoGroupFunction}.
* If a DataSet has a group with no matching key in the other DataSet, the CoGroupFunction
- * is called with an empty group for the non-existing group.</br>
+ * is called with an empty group for the non-existing group.<br/>
* The CoGroupFunction can iterate over the elements of both groups and return any number
- * of elements including none.</br>
+ * of elements including none.<br/>
* This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods
* can be called to define the join key of the first joining (i.e., this) DataSet.
*
@@ -860,7 +983,7 @@ public abstract class DataSet<T> {
* <p>
* Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
* {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
- * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
+ * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br/>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
@@ -890,7 +1013,7 @@ public abstract class DataSet<T> {
* <p>
* Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
* {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
- * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
+ * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br/>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.
@@ -920,7 +1043,7 @@ public abstract class DataSet<T> {
* <p>
* Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a
* {@link org.apache.flink.api.common.functions.CrossFunction} which is called for
- * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.</br>
+ * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.<br/>
*
* @param other The other DataSet with which this DataSet is crossed.
* @return A DefaultCross that returns a Tuple2 for each pair of crossed elements.