You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/09 16:20:52 UTC
[3/6] flink git commit: [FLINK-2576] [javaAPI] [scalaAPI] [optimizer]
Add outerJoin to DataSet API (Java, Scala) and optimizer.
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
deleted file mode 100644
index 7390af2..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/JoinOperatorBaseTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.operators.base;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Future;
-
-@SuppressWarnings({ "unchecked", "serial" })
-public class JoinOperatorBaseTest implements Serializable {
-
-
- @Test
- public void testTupleBaseJoiner(){
- final FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>> joiner =
- new FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>>()
- {
- @Override
- public void join(Tuple3<String, Double, Integer> first, Tuple2<Integer, String> second, Collector<Tuple2<Double, String>> out) {
- Tuple3<String, Double, Integer> fst = (Tuple3<String, Double, Integer>)first;
- Tuple2<Integer, String> snd = (Tuple2<Integer, String>)second;
-
- assertEquals(fst.f0, snd.f1);
- assertEquals(fst.f2, snd.f0);
-
- out.collect(new Tuple2<Double, String>(fst.f1, snd.f0.toString()));
- }
- };
-
- final TupleTypeInfo<Tuple3<String, Double, Integer>> leftTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo
- (String.class, Double.class, Integer.class);
- final TupleTypeInfo<Tuple2<Integer, String>> rightTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class,
- String.class);
- final TupleTypeInfo<Tuple2<Double, String>> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class,
- String.class);
-
- final int[] leftKeys = new int[]{0,2};
- final int[] rightKeys = new int[]{1,0};
-
- final String taskName = "Collection based tuple joiner";
-
- final BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double,
- String>> binaryOpInfo = new BinaryOperatorInformation<Tuple3<String, Double, Integer>, Tuple2<Integer,
- String>, Tuple2<Double, String>>(leftTypeInfo, rightTypeInfo, outTypeInfo);
-
- final JoinOperatorBase<Tuple3<String, Double, Integer>, Tuple2<Integer,
- String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer,
- String>, Tuple2<Double, String>>> base = new JoinOperatorBase<Tuple3<String, Double, Integer>,
- Tuple2<Integer, String>, Tuple2<Double, String>, FlatJoinFunction<Tuple3<String, Double, Integer>,
- Tuple2<Integer, String>, Tuple2<Double, String>>>(joiner, binaryOpInfo, leftKeys, rightKeys, taskName);
-
- final List<Tuple3<String, Double, Integer> > inputData1 = new ArrayList<Tuple3<String, Double,
- Integer>>(Arrays.asList(
- new Tuple3<String, Double, Integer>("foo", 42.0, 1),
- new Tuple3<String,Double, Integer>("bar", 1.0, 2),
- new Tuple3<String, Double, Integer>("bar", 2.0, 3),
- new Tuple3<String, Double, Integer>("foobar", 3.0, 4),
- new Tuple3<String, Double, Integer>("bar", 3.0, 3)
- ));
-
- final List<Tuple2<Integer, String>> inputData2 = new ArrayList<Tuple2<Integer, String>>(Arrays.asList(
- new Tuple2<Integer, String>(3, "bar"),
- new Tuple2<Integer, String>(4, "foobar"),
- new Tuple2<Integer, String>(2, "foo")
- ));
- final Set<Tuple2<Double, String>> expected = new HashSet<Tuple2<Double, String>>(Arrays.asList(
- new Tuple2<Double, String>(2.0, "3"),
- new Tuple2<Double, String>(3.0, "3"),
- new Tuple2<Double, String>(3.0, "4")
- ));
-
- try {
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.disableObjectReuse();
- List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
- executionConfig.enableObjectReuse();
- List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>()), executionConfig);
-
- assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultSafe));
- assertEquals(expected, new HashSet<Tuple2<Double, String>>(resultRegular));
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index e890b4e..916086b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.translation.PlanProjectOperator;
@@ -138,7 +138,7 @@ public class SemanticPropertiesProjectionTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+ InnerJoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput());
DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties();
@@ -166,7 +166,7 @@ public class SemanticPropertiesProjectionTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput());
+ InnerJoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput());
DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties();
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
index 33b3958..d01ca32 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.DualInputSemanticProperties;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.SemanticProperties;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
@@ -265,7 +265,7 @@ public class SemanticPropertiesTranslationTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 0));
@@ -292,7 +292,7 @@ public class SemanticPropertiesTranslationTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(1, 0));
@@ -319,7 +319,7 @@ public class SemanticPropertiesTranslationTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(1, 0));
@@ -352,7 +352,7 @@ public class SemanticPropertiesTranslationTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 1));
@@ -382,7 +382,7 @@ public class SemanticPropertiesTranslationTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 1));
@@ -410,7 +410,7 @@ public class SemanticPropertiesTranslationTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getForwardingTargetFields(0, 0));
@@ -440,7 +440,7 @@ public class SemanticPropertiesTranslationTest {
Plan plan = env.createProgramPlan();
GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next();
- JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> join = (InnerJoinOperatorBase<?, ?, ?, ?>) sink.getInput();
DualInputSemanticProperties semantics = join.getSemanticProperties();
assertNotNull(semantics.getReadFields(0));
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
index b8663ce..c14f175 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
@@ -102,7 +102,7 @@ public class NamesTest implements Serializable {
plan.accept(new Visitor<Operator<?>>() {
@Override
public boolean preVisit(Operator<?> visitable) {
- if(visitable instanceof JoinOperatorBase) {
+ if(visitable instanceof InnerJoinOperatorBase) {
Assert.assertEquals("Join at testJoinWith(NamesTest.java:93)", visitable.getName());
}
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index f9ce82f..fd60bc6 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
@@ -123,8 +123,8 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
assertEquals(ITERATION_NAME, iteration.getName());
MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();
- JoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (JoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
- JoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (JoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
+ InnerJoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
+ InnerJoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
MapOperatorBase<?, ?, ?> worksetMapper = (MapOperatorBase<?, ?, ?>) worksetSelfJoin.getFirstInput();
assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
index 6a3ff09..553c127 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -196,6 +196,9 @@ public abstract class CostEstimator {
break;
case INNER_MERGE:
+ case FULL_OUTER_MERGE:
+ case LEFT_OUTER_MERGE:
+ case RIGHT_OUTER_MERGE:
addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST:
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
index 02c9b5b..383bbe1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
@@ -23,7 +23,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
@@ -32,7 +32,7 @@ import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
+import org.apache.flink.optimizer.operators.SortMergeInnerJoinDescriptor;
import org.apache.flink.configuration.Configuration;
/**
@@ -47,7 +47,7 @@ public class JoinNode extends TwoInputNode {
*
* @param joinOperatorBase The join operator object.
*/
- public JoinNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
+ public JoinNode(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
super(joinOperatorBase);
this.dataProperties = getDataProperties(joinOperatorBase,
@@ -62,8 +62,8 @@ public class JoinNode extends TwoInputNode {
* @return The contract.
*/
@Override
- public JoinOperatorBase<?, ?, ?, ?> getOperator() {
- return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
+ public InnerJoinOperatorBase<?, ?, ?, ?> getOperator() {
+ return (InnerJoinOperatorBase<?, ?, ?, ?>) super.getOperator();
}
@Override
@@ -111,7 +111,7 @@ public class JoinNode extends TwoInputNode {
}
}
- private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,
+ private List<OperatorDescriptorDual> getDataProperties(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,
Partitioner<?> customPartitioner)
{
// see if an internal hint dictates the strategy to use
@@ -125,7 +125,7 @@ public class JoinNode extends TwoInputNode {
Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
{
- fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
+ fixedDriverStrat = new SortMergeInnerJoinDescriptor(this.keys1, this.keys2);
}
else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
@@ -164,10 +164,10 @@ public class JoinNode extends TwoInputNode {
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
break;
case REPARTITION_SORT_MERGE:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
+ list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
break;
case OPTIMIZER_CHOOSES:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
+ list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
break;
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
deleted file mode 100644
index ee8ab05..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
-import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.operators.SortMergeJoinDescriptor;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The Optimizer representation of a join operator.
- */
-public class MatchNode extends TwoInputNode {
-
- private List<OperatorDescriptorDual> dataProperties;
-
- /**
- * Creates a new MatchNode for the given join operator.
- *
- * @param joinOperatorBase The join operator object.
- */
- public MatchNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
- super(joinOperatorBase);
- this.dataProperties = getDataProperties(joinOperatorBase, joinOperatorBase.getJoinHint());
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Gets the contract object for this match node.
- *
- * @return The contract.
- */
- @Override
- public JoinOperatorBase<?, ?, ?, ?> getOperator() {
- return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
- }
-
- @Override
- public String getOperatorName() {
- return "Join";
- }
-
- @Override
- protected List<OperatorDescriptorDual> getPossibleProperties() {
- return this.dataProperties;
- }
-
- public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
- OperatorDescriptorDual op;
- if (solutionsetInputIndex == 0) {
- op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
- } else if (solutionsetInputIndex == 1) {
- op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
- } else {
- throw new IllegalArgumentException();
- }
-
- this.dataProperties = Collections.singletonList(op);
- }
-
- /**
- * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
- * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
- * The result cardinality is hence the larger one.
- */
- @Override
- protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
- long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
- long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
- this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
-
- if (this.estimatedNumRecords >= 0) {
- float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
- float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
- float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-
- if (width > 0) {
- this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
- }
- }
- }
-
- private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) {
- // see if an internal hint dictates the strategy to use
- Configuration conf = joinOperatorBase.getParameters();
- String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-
- if (localStrategy != null) {
- final OperatorDescriptorDual fixedDriverStrat;
- if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
- Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
- {
- fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
- } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
- fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
- } else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
- fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
- } else {
- throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
- }
- ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
- list.add(fixedDriverStrat);
- return list;
- }
- else {
- ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-
- joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
-
- switch (joinHint) {
- case BROADCAST_HASH_FIRST:
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
- break;
- case BROADCAST_HASH_SECOND:
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
- break;
- case REPARTITION_HASH_FIRST:
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
- break;
- case REPARTITION_HASH_SECOND:
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
- break;
- case REPARTITION_SORT_MERGE:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
- break;
- case OPTIMIZER_CHOOSES:
- list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
- list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
- list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
- break;
- default:
- throw new CompilerException("Unrecognized join hint: " + joinHint);
- }
-
- return list;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
new file mode 100644
index 0000000..ebdfcc8
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
+import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
+import org.apache.flink.optimizer.operators.SortMergeRightOuterJoinDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OuterJoinNode extends TwoInputNode {
+
+ private List<OperatorDescriptorDual> dataProperties;
+
+ /**
+ * Creates a new two input node for the optimizer plan, representing the given operator.
+ *
+ * @param operator The operator that the optimizer DAG node should represent.
+ */
+ public OuterJoinNode(OuterJoinOperatorBase<?, ?, ?, ?> operator) {
+ super(operator);
+
+ this.dataProperties = getDataProperties();
+ }
+
+ private List<OperatorDescriptorDual> getDataProperties() {
+ OuterJoinOperatorBase<?, ?, ?, ?> operator = getOperator();
+
+ OuterJoinType type = operator.getOuterJoinType();
+
+ JoinHint joinHint = operator.getJoinHint();
+ joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
+
+ List<OperatorDescriptorDual> list = new ArrayList<>();
+ switch (joinHint) {
+ case OPTIMIZER_CHOOSES:
+ list.add(getSortMergeDescriptor(type, true));
+ break;
+ case REPARTITION_SORT_MERGE:
+ list.add(getSortMergeDescriptor(type, false));
+ break;
+ case REPARTITION_HASH_FIRST:
+ case REPARTITION_HASH_SECOND:
+ case BROADCAST_HASH_FIRST:
+ case BROADCAST_HASH_SECOND:
+ default:
+ throw new CompilerException("Invalid join hint: " + joinHint + " for outer join type: " + type);
+ }
+
+ Partitioner<?> customPartitioner = operator.getCustomPartitioner();
+ if (customPartitioner != null) {
+ for (OperatorDescriptorDual desc : list) {
+ ((AbstractJoinDescriptor) desc).setCustomPartitioner(customPartitioner);
+ }
+ }
+ return list;
+ }
+
+ private OperatorDescriptorDual getSortMergeDescriptor(OuterJoinType type, boolean broadcastAllowed) {
+ if (type == OuterJoinType.FULL) {
+ return new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2);
+ } else if (type == OuterJoinType.LEFT) {
+ return new SortMergeLeftOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
+ } else {
+ return new SortMergeRightOuterJoinDescriptor(this.keys1, this.keys2, broadcastAllowed);
+ }
+ }
+
+ @Override
+ public OuterJoinOperatorBase<?, ?, ?, ?> getOperator() {
+ return (OuterJoinOperatorBase<?, ?, ?, ?>) super.getOperator();
+ }
+
+ @Override
+ protected List<OperatorDescriptorDual> getPossibleProperties() {
+ return dataProperties;
+ }
+
+ @Override
+ public String getOperatorName() {
+ return "Outer Join";
+ }
+
+ @Override
+ protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+ long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+ long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
+
+ if (card1 < 0 || card2 < 0) {
+ this.estimatedNumRecords = -1;
+ } else {
+ this.estimatedNumRecords = Math.max(card1, card2);
+ }
+
+ if (this.estimatedNumRecords >= 0) {
+ float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+ float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+ float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
+
+ if (width > 0) {
+ this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
new file mode 100644
index 0000000..d54b5cf
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractSortMergeJoinDescriptor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractSortMergeJoinDescriptor extends AbstractJoinDescriptor {
+
+ public AbstractSortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2);
+ }
+
+ public AbstractSortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
+ boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) {
+ super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+ }
+
+ @Override
+ protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+ RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1));
+ RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
+ return Collections.singletonList(new LocalPropertiesPair(sort1, sort2));
+ }
+
+ @Override
+ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2) {
+ int numRelevantFields = this.keys1.size();
+ return checkSameOrdering(produced1, produced2, numRelevantFields);
+ }
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+ boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
+
+ if (inputOrders == null || inputOrders.length < this.keys1.size()) {
+ throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator.");
+ } else if (inputOrders.length > this.keys1.size()) {
+ boolean[] tmp = new boolean[this.keys1.size()];
+ System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+ inputOrders = tmp;
+ }
+
+ String nodeName = String.format("%s(%s)", getNodeName(), node.getOperator().getName());
+ return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2, inputOrders);
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ LocalProperties comb = LocalProperties.combine(in1, in2);
+ return comb.clearUniqueFieldSets();
+ }
+
+ protected abstract String getNodeName();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
index 368944e..571d6e6 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
@@ -177,30 +177,9 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
- LocalProperties produced1, LocalProperties produced2)
- {
+ LocalProperties produced1, LocalProperties produced2) {
int numRelevantFields = this.keys1.size();
-
- Ordering prod1 = produced1.getOrdering();
- Ordering prod2 = produced2.getOrdering();
-
- if (prod1 == null || prod2 == null) {
- throw new CompilerException("The given properties do not meet this operators requirements.");
- }
-
- // check that order of fields is equivalent
- if (!checkEquivalentFieldPositionsInKeyFields(
- prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
- return false;
- }
-
- // check that order directions are equivalent
- for (int i = 0; i < numRelevantFields; i++) {
- if (prod1.getOrder(i) != prod2.getOrder(i)) {
- return false;
- }
- }
- return true;
+ return checkSameOrdering(produced1, produced2, numRelevantFields);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
index c21593e..17ea8a5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
@@ -21,6 +21,7 @@ package org.apache.flink.optimizer.operators;
import java.util.List;
+import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.dag.TwoInputNode;
@@ -124,6 +125,29 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript
return true;
}
+ protected boolean checkSameOrdering(LocalProperties produced1, LocalProperties produced2, int numRelevantFields) {
+ Ordering prod1 = produced1.getOrdering();
+ Ordering prod2 = produced2.getOrdering();
+
+ if (prod1 == null || prod2 == null) {
+ throw new CompilerException("The given properties do not meet this operators requirements.");
+ }
+
+ // check that order of fields is equivalent
+ if (!checkEquivalentFieldPositionsInKeyFields(
+ prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
+ return false;
+ }
+
+ // check that both inputs have the same directions of order
+ for (int i = 0; i < numRelevantFields; i++) {
+ if (prod1.getOrder(i) != prod2.getOrder(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
// --------------------------------------------------------------------------------------------
public static final class GlobalPropertiesPair {
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java
new file mode 100644
index 0000000..4e05067
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeFullOuterJoinDescriptor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeFullOuterJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+ public SortMergeFullOuterJoinDescriptor(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2, false, false, true);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.FULL_OUTER_MERGE;
+ }
+
+ @Override
+ protected String getNodeName() {
+ return "FullOuterJoin";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java
new file mode 100644
index 0000000..1c3ea19
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeInnerJoinDescriptor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeInnerJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+ public SortMergeInnerJoinDescriptor(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2);
+ }
+
+ public SortMergeInnerJoinDescriptor(FieldList keys1, FieldList keys2,
+ boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) {
+ super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.INNER_MERGE;
+ }
+
+ @Override
+ protected String getNodeName() {
+ return "Join";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
deleted file mode 100644
index 3ab0aa7..0000000
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
-
- public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
- super(keys1, keys2);
- }
-
- public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
- boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
- {
- super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.INNER_MERGE;
- }
-
- @Override
- protected List<LocalPropertiesPair> createPossibleLocalProperties() {
- RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1));
- RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
- return Collections.singletonList(new LocalPropertiesPair(sort1, sort2));
- }
-
- @Override
- public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
- LocalProperties produced1, LocalProperties produced2)
- {
- int numRelevantFields = this.keys1.size();
-
- Ordering prod1 = produced1.getOrdering();
- Ordering prod2 = produced2.getOrdering();
-
- if (prod1 == null || prod2 == null) {
- throw new CompilerException("The given properties do not meet this operators requirements.");
- }
-
- // check that order of fields is equivalent
- if (!checkEquivalentFieldPositionsInKeyFields(
- prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
- return false;
- }
-
- // check that both inputs have the same directions of order
- for (int i = 0; i < numRelevantFields; i++) {
- if (prod1.getOrder(i) != prod2.getOrder(i)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
- boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
-
- if (inputOrders == null || inputOrders.length < this.keys1.size()) {
- throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator.");
- } else if (inputOrders.length > this.keys1.size()) {
- boolean[] tmp = new boolean[this.keys1.size()];
- System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
- inputOrders = tmp;
- }
-
- return new DualInputPlanNode(node, "Join(" + node.getOperator().getName() + ")", in1, in2, DriverStrategy.INNER_MERGE, this.keys1, this.keys2, inputOrders);
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
- LocalProperties comb = LocalProperties.combine(in1, in2);
- return comb.clearUniqueFieldSets();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java
new file mode 100644
index 0000000..8193960
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeLeftOuterJoinDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeLeftOuterJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+ public SortMergeLeftOuterJoinDescriptor(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2, false, true, true);
+ }
+
+ public SortMergeLeftOuterJoinDescriptor(FieldList keys1, FieldList keys2, boolean broadcastAllowed) {
+ super(keys1, keys2, false, broadcastAllowed, true);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.LEFT_OUTER_MERGE;
+ }
+
+ @Override
+ protected String getNodeName() {
+ return "LeftOuterJoin";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java
new file mode 100644
index 0000000..3719d05
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeRightOuterJoinDescriptor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public class SortMergeRightOuterJoinDescriptor extends AbstractSortMergeJoinDescriptor {
+
+ public SortMergeRightOuterJoinDescriptor(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2, true, false, true);
+ }
+
+ public SortMergeRightOuterJoinDescriptor(FieldList keys1, FieldList keys2, boolean broadcastAllowed) {
+ super(keys1, keys2, broadcastAllowed, false, true);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.RIGHT_OUTER_MERGE;
+ }
+
+ @Override
+ protected String getNodeName() {
+ return "RightOuterJoin";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index 7fbdf81..bcdee14 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -26,15 +26,17 @@ import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Union;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.FilterOperatorBase;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
@@ -44,6 +46,7 @@ import org.apache.flink.optimizer.dag.BinaryUnionNode;
import org.apache.flink.optimizer.dag.BulkIterationNode;
import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
import org.apache.flink.optimizer.dag.CoGroupNode;
+import org.apache.flink.optimizer.dag.CoGroupRawNode;
import org.apache.flink.optimizer.dag.CollectorMapNode;
import org.apache.flink.optimizer.dag.CrossNode;
import org.apache.flink.optimizer.dag.DagConnection;
@@ -57,6 +60,7 @@ import org.apache.flink.optimizer.dag.JoinNode;
import org.apache.flink.optimizer.dag.MapNode;
import org.apache.flink.optimizer.dag.MapPartitionNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.OuterJoinNode;
import org.apache.flink.optimizer.dag.PartitionNode;
import org.apache.flink.optimizer.dag.ReduceNode;
import org.apache.flink.optimizer.dag.SolutionSetNode;
@@ -69,8 +73,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.flink.api.common.operators.base.CoGroupRawOperatorBase;
-import org.apache.flink.optimizer.dag.CoGroupRawNode;
/**
* This traversal creates the optimizer DAG from a program.
@@ -160,8 +162,11 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
else if (c instanceof GroupReduceOperatorBase) {
n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
}
- else if (c instanceof JoinOperatorBase) {
- n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
+ else if (c instanceof InnerJoinOperatorBase) {
+ n = new JoinNode((InnerJoinOperatorBase<?, ?, ?, ?>) c);
+ }
+ else if (c instanceof OuterJoinOperatorBase) {
+ n = new OuterJoinNode((OuterJoinOperatorBase<?, ?, ?, ?>) c);
}
else if (c instanceof CoGroupOperatorBase) {
n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
index 8c19462..17f0241 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/FeedbackPropertiesMatchTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.operators.OperatorInformation;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.operators.util.FieldSet;
@@ -1431,6 +1431,6 @@ public class FeedbackPropertiesMatchTest {
}
private static JoinNode getJoinNode() {
- return new JoinNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+ return new JoinNode(new InnerJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 00ada2a..321e5ca 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.optimizer;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldSet;
@@ -126,7 +126,7 @@ public class SemanticPropertiesAPIToPlanTest extends CompilerTestBase {
oPlan.accept(new Visitor<PlanNode>() {
@Override
public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof JoinOperatorBase) {
+ if (visitable instanceof DualInputPlanNode && visitable.getProgramOperator() instanceof InnerJoinOperatorBase) {
DualInputPlanNode node = ((DualInputPlanNode) visitable);
final Channel inConn1 = node.getInput1();
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
index 839f0a1..1d559c2 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
@@ -34,8 +34,8 @@ public class JoinGlobalPropertiesCompatibilityTest {
try {
final FieldList keysLeft = new FieldList(1, 4);
final FieldList keysRight = new FieldList(3, 1);
-
- SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+
+ SortMergeInnerJoinDescriptor descr = new SortMergeInnerJoinDescriptor(keysLeft, keysRight);
// test compatible hash partitioning
{
@@ -121,7 +121,7 @@ public class JoinGlobalPropertiesCompatibilityTest {
}
};
- SortMergeJoinDescriptor descr = new SortMergeJoinDescriptor(keysLeft, keysRight);
+ SortMergeInnerJoinDescriptor descr = new SortMergeInnerJoinDescriptor(keysLeft, keysRight);
// test incompatible hash with custom partitioning
{
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 207bc5d..306a15b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.aggregators.Aggregator
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.io.{FileOutputFormat, OutputFormat}
import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.Utils.CountHelper
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.functions.{FirstReducer, KeySelector}
import org.apache.flink.api.java.io.{DiscardingOutputFormat, PrintingOutputFormat, TextOutputFormat}
import org.apache.flink.api.java.operators.Keys.ExpressionKeys
import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.java.{DataSet => JavaDataSet, Utils}
import org.apache.flink.api.scala.operators.{ScalaAggregateOperator, ScalaCsvOutputFormat}
import org.apache.flink.configuration.Configuration
@@ -840,11 +841,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
/**
* Creates a new DataSet by joining `this` DataSet with the `other` DataSet. To specify the join
- * keys the `where` and `isEqualTo` methods must be used. For example:
+ * keys the `where` and `equalTo` methods must be used. For example:
* {{{
* val left: DataSet[(String, Int, Int)] = ...
* val right: DataSet[(Int, String, Int)] = ...
- * val joined = left.join(right).where(0).isEqualTo(1)
+ * val joined = left.join(right).where(0).equalTo(1)
* }}}
*
* The default join result is a DataSet with 2-Tuples of the joined values. In the above example
@@ -854,7 +855,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* {{{
* val left: DataSet[(String, Int, Int)] = ...
* val right: DataSet[(Int, String, Int)] = ...
- * val joined = left.join(right).where(0).isEqualTo(1) { (l, r) =>
+ * val joined = left.join(right).where(0).equalTo(1) { (l, r) =>
* (l._1, r._2)
* }
* }}}
@@ -864,7 +865,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* {{{
* val left: DataSet[(String, Int, Int)] = ...
* val right: DataSet[(Int, String, Int)] = ...
- * val joined = left.join(right).where(0).isEqualTo(1) {
+ * val joined = left.join(right).where(0).equalTo(1) {
* (l, r, out: Collector[(String, Int)]) =>
* if (l._2 > 4) {
* out.collect((l._1, r._3))
@@ -875,29 +876,119 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* }
* }}}
*/
- def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
- new UnfinishedJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES)
+ def join[O](other: DataSet[O]): UnfinishedInnerJoinOperation[T, O] =
+ new UnfinishedInnerJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES)
/**
* Special [[join]] operation for explicitly telling the system what join strategy to use. If
* null is given as the join strategy, then the optimizer will pick the strategy.
*/
- def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O] =
- new UnfinishedJoinOperation(this, other, strategy)
+ def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedInnerJoinOperation[T, O] =
+ new UnfinishedInnerJoinOperation(this, other, strategy)
/**
* Special [[join]] operation for explicitly telling the system that the right side is assumed
* to be a lot smaller than the left side of the join.
*/
- def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
- new UnfinishedJoinOperation(this, other, JoinHint.BROADCAST_HASH_SECOND)
+ def joinWithTiny[O](other: DataSet[O]): UnfinishedInnerJoinOperation[T, O] =
+ new UnfinishedInnerJoinOperation(this, other, JoinHint.BROADCAST_HASH_SECOND)
/**
* Special [[join]] operation for explicitly telling the system that the left side is assumed
* to be a lot smaller than the right side of the join.
*/
- def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O] =
- new UnfinishedJoinOperation(this, other, JoinHint.BROADCAST_HASH_FIRST)
+ def joinWithHuge[O](other: DataSet[O]): UnfinishedInnerJoinOperation[T, O] =
+ new UnfinishedInnerJoinOperation(this, other, JoinHint.BROADCAST_HASH_FIRST)
+
+ /**
+ * Creates a new DataSet by performing a full outer join of `this` DataSet
+ * with the `other` DataSet, by combining two elements of two DataSets on
+ * key equality.
+ * Elements of both DataSets that do not have a matching element on the
+ * opposing side are joined with `null` and emitted to the resulting DataSet.
+ *
+ * To specify the join keys the `where` and `equalTo` methods must be used. For example:
+ * {{{
+ * val left: DataSet[(String, Int, Int)] = ...
+ * val right: DataSet[(Int, String, Int)] = ...
+ * val joined = left.fullOuterJoin(right).where(0).equalTo(1)
+ * }}}
+ *
+ * When using an outer join you are required to specify a join function. For example:
+ * {{{
+ * val joined = left.fullOuterJoin(right).where(0).equalTo(1) {
+ * (left, right) =>
+ * val a = if (left == null) null else left._1
+ * val b = if (right == null) null else right._3
+ * (a, b)
+ * }
+ * }}}
+ */
+ def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O] =
+ new UnfinishedOuterJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.FULL_OUTER)
+
+ /**
+ * Special [[fullOuterJoin]] operation for explicitly telling the system what join strategy to
+ * use. If null is given as the join strategy, then the optimizer will pick the strategy.
+ */
+ def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
+ new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.FULL_OUTER)
+
+ /**
+ * An outer join on the left side.
+ *
+ * Elements of the left side (i.e. `this`) that do not have a matching element on the other
+ * side are joined with `null` and emitted to the resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+ * @see #fullOuterJoin
+ */
+ def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O] =
+ new UnfinishedOuterJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.LEFT_OUTER)
+
+ /**
+ * An outer join on the left side.
+ *
+ * Elements of the left side (i.e. `this`) that do not have a matching element on the other
+ * side are joined with `null` and emitted to the resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @param strategy The strategy that should be used execute the join. If { @code null} is given,
+ * then the optimizer will pick the join strategy.
+ * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+ * @see #fullOuterJoin
+ */
+ def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
+ new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.LEFT_OUTER)
+
+ /**
+ * An outer join on the right side.
+ *
+ * Elements of the right side (i.e. `other`) that do not have a matching element on `this`
+ * side are joined with `null` and emitted to the resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+ * @see #fullOuterJoin
+ */
+ def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O] =
+ new UnfinishedOuterJoinOperation(this, other, JoinHint.OPTIMIZER_CHOOSES, JoinType.RIGHT_OUTER)
+
+ /**
+ * An outer join on the right side.
+ *
+ * Elements of the right side (i.e. `other`) that do not have a matching element on `this`
+ * side are joined with `null` and emitted to the resulting DataSet.
+ *
+ * @param other The other DataSet with which this DataSet is joined.
+ * @param strategy The strategy that should be used execute the join. If { @code null} is given,
+ * then the optimizer will pick the join strategy.
+ * @return An UnfinishedJoinOperation to continue with the definition of the join transformation
+ * @see #fullOuterJoin
+ */
+ def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] =
+ new UnfinishedOuterJoinOperation(this, other, strategy, JoinType.RIGHT_OUTER)
// --------------------------------------------------------------------------------------------
// Co-Group
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index ecc1aab..f57fc25 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -17,12 +17,15 @@
*/
package org.apache.flink.api.scala
+import org.apache.flink.api.common.operators.Operator
+import org.apache.flink.api.common.operators.base.JoinOperatorBase
import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction, Partitioner, RichFlatJoinFunction}
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlatJoinFunction
import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
import org.apache.flink.api.java.operators._
+import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@@ -60,7 +63,7 @@ class JoinDataSet[L, R](
rightInput: DataSet[R],
leftKeys: Keys[L],
rightKeys: Keys[R])
- extends DataSet(defaultJoin) {
+ extends DataSet(defaultJoin) with JoinFunctionAssigner[L, R] {
private var customPartitioner : Partitioner[_] = _
@@ -84,7 +87,8 @@ class JoinDataSet[L, R](
joiner,
implicitly[TypeInformation[O]],
defaultJoin.getJoinHint,
- getCallLocationName())
+ getCallLocationName(),
+ defaultJoin.getJoinType)
if (customPartitioner != null) {
wrap(joinOperator.withPartitioner(customPartitioner))
@@ -114,7 +118,8 @@ class JoinDataSet[L, R](
joiner,
implicitly[TypeInformation[O]],
defaultJoin.getJoinHint,
- getCallLocationName())
+ getCallLocationName(),
+ defaultJoin.getJoinType)
if (customPartitioner != null) {
wrap(joinOperator.withPartitioner(customPartitioner))
@@ -142,7 +147,8 @@ class JoinDataSet[L, R](
joiner,
implicitly[TypeInformation[O]],
defaultJoin.getJoinHint,
- getCallLocationName())
+ getCallLocationName(),
+ defaultJoin.getJoinType)
if (customPartitioner != null) {
wrap(joinOperator.withPartitioner(customPartitioner))
@@ -171,7 +177,8 @@ class JoinDataSet[L, R](
generatedFunction, fun,
implicitly[TypeInformation[O]],
defaultJoin.getJoinHint,
- getCallLocationName())
+ getCallLocationName(),
+ defaultJoin.getJoinType)
if (customPartitioner != null) {
wrap(joinOperator.withPartitioner(customPartitioner))
@@ -205,9 +212,46 @@ class JoinDataSet[L, R](
}
}
+private[flink] abstract class UnfinishedJoinOperation[L, R, O <: JoinFunctionAssigner[L, R]](
+ leftSet: DataSet[L],
+ rightSet: DataSet[R],
+ val joinHint: JoinHint,
+ val joinType: JoinType)
+ extends UnfinishedKeyPairOperation[L, R, O](leftSet, rightSet) {
+
+ def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]): O
+
+ private[flink] def createDefaultJoin(leftKey: Keys[L], rightKey: Keys[R]) = {
+ val joiner = new FlatJoinFunction[L, R, (L, R)] {
+ def join(left: L, right: R, out: Collector[(L, R)]) = {
+ out.collect((left, right))
+ }
+ }
+ val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
+ val joinOperator = new EquiJoin[L, R, (L, R)](
+ leftSet.javaSet,
+ rightSet.javaSet,
+ leftKey,
+ rightKey,
+ joiner,
+ returnType,
+ joinHint,
+ getCallLocationName(),
+ joinType)
+
+ new JoinDataSet(joinOperator, leftSet, rightSet, leftKey, rightKey)
+ }
+
+ private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
+ createJoinFunctionAssigner(leftKey, rightKey)
+ }
+}
+
/**
- * An unfinished join operation that results from [[DataSet.join()]] The keys for the left and right
- * side must be specified using first `where` and then `equalTo`. For example:
+ * An unfinished inner join operation that results from calling [[DataSet.join()]].
+ * The keys for the left and right side must be specified using first `where` and then `equalTo`.
+ *
+ * For example:
*
* {{{
* val left = ...
@@ -217,24 +261,77 @@ class JoinDataSet[L, R](
* @tparam L The type of the left input of the join.
* @tparam R The type of the right input of the join.
*/
-class UnfinishedJoinOperation[L, R](
+class UnfinishedInnerJoinOperation[L, R](
leftSet: DataSet[L],
rightSet: DataSet[R],
- val joinHint: JoinHint)
- extends UnfinishedKeyPairOperation[L, R, JoinDataSet[L, R]](leftSet, rightSet) {
+ joinHint: JoinHint)
+ extends UnfinishedJoinOperation[L, R, JoinDataSet[L, R]](
+ leftSet, rightSet, joinHint, JoinType.INNER) {
- private[flink] def finish(leftKey: Keys[L], rightKey: Keys[R]) = {
- val joiner = new FlatJoinFunction[L, R, (L, R)] {
- def join(left: L, right: R, out: Collector[(L, R)]) = {
- out.collect((left, right))
- }
- }
- val returnType = createTuple2TypeInformation[L, R](leftInput.getType(), rightInput.getType())
- val joinOperator = new EquiJoin[L, R, (L, R)](
- leftSet.javaSet, rightSet.javaSet, leftKey, rightKey, joiner, returnType, joinHint,
- getCallLocationName())
+ override def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]) = {
+ createDefaultJoin(leftKey, rightKey)
+ }
+}
- new JoinDataSet(joinOperator, leftSet, rightSet, leftKey, rightKey)
+/**
+ * An unfinished outer join operation that results from calling, e.g. [[DataSet.fullOuterJoin()]].
+ * The keys for the left and right side must be specified using first `where` and then `equalTo`.
+ *
+ * Note that a join function must always be specified explicitly when construction an outer join
+ * operator.
+ *
+ * For example:
+ *
+ * {{{
+ * val left = ...
+ * val right = ...
+ * val joinResult = left.fullOuterJoin(right).where(...).equalTo(...) {
+ * (first, second) => ...
+ * }
+ * }}}
+ * @tparam L The type of the left input of the join.
+ * @tparam R The type of the right input of the join.
+ */
+class UnfinishedOuterJoinOperation[L, R](
+ leftSet: DataSet[L],
+ rightSet: DataSet[R],
+ joinHint: JoinHint,
+ joinType: JoinType)
+ extends UnfinishedJoinOperation[L, R, JoinFunctionAssigner[L, R]](
+ leftSet, rightSet, joinHint, joinType) {
+
+ override def createJoinFunctionAssigner(leftKey: Keys[L], rightKey: Keys[R]):
+ JoinFunctionAssigner[L, R] = {
+ new DefaultJoinFunctionAssigner(createDefaultJoin(leftKey, rightKey))
+ }
+
+ private class DefaultJoinFunctionAssigner(val defaultJoin: JoinDataSet[L, R])
+ extends JoinFunctionAssigner[L, R] {
+
+ override def withPartitioner[K: TypeInformation](part: Partitioner[K]) =
+ defaultJoin.withPartitioner(part)
+
+ override def apply[O: TypeInformation : ClassTag](fun: (L, R) => O) =
+ defaultJoin.apply(fun)
+
+ override def apply[O: TypeInformation : ClassTag](fun: (L, R, Collector[O]) => Unit) =
+ defaultJoin.apply(fun)
+
+ override def apply[O: TypeInformation : ClassTag](fun: FlatJoinFunction[L, R, O]) =
+ defaultJoin.apply(fun)
+
+ override def apply[O: TypeInformation : ClassTag](fun: JoinFunction[L, R, O]) =
+ defaultJoin.apply(fun)
}
}
+
+trait JoinFunctionAssigner[L, R] {
+
+ def withPartitioner[K : TypeInformation](part : Partitioner[K]) : JoinFunctionAssigner[L, R]
+ def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
+ def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]
+ def apply[O: TypeInformation: ClassTag](fun: FlatJoinFunction[L, R, O]): DataSet[O]
+ def apply[O: TypeInformation: ClassTag](fun: JoinFunction[L, R, O]): DataSet[O]
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b00c1d7e/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 566573e..7605b3a 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -29,7 +29,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.junit.Assert;
public class TestEnvironment extends ExecutionEnvironment {
@@ -60,21 +59,13 @@ public class TestEnvironment extends ExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- try {
- OptimizedPlan op = compileProgram(jobName);
+ OptimizedPlan op = compileProgram(jobName);
- JobGraphGenerator jgg = new JobGraphGenerator();
- JobGraph jobGraph = jgg.compileJobGraph(op);
-
- this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
- return this.lastJobExecutionResult;
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- Assert.fail("Job execution failed!");
- return null;
- }
+ JobGraphGenerator jgg = new JobGraphGenerator();
+ JobGraph jobGraph = jgg.compileJobGraph(op);
+
+ this.lastJobExecutionResult = executor.submitJobAndWait(jobGraph, false);
+ return this.lastJobExecutionResult;
}