You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/09 16:20:55 UTC
[6/6] flink git commit: [FLINK-2576] Add outer join base operator.
[FLINK-2576] Add outer join base operator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b222276
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b222276
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b222276
Branch: refs/heads/master
Commit: 6b2222762fc38d84b31170216d6b6ae0c272af9b
Parents: 0455857
Author: r-pogalz <r....@campus.tu-berlin.de>
Authored: Tue Jul 7 21:40:04 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 9 16:19:21 2015 +0200
----------------------------------------------------------------------
.../operators/base/OuterJoinOperatorBase.java | 314 +++++++++++++++++++
.../base/OuterJoinOperatorBaseTest.java | 150 +++++++++
.../runtime/operators/FullOuterJoinDriver.java | 2 +-
.../runtime/operators/LeftOuterJoinDriver.java | 2 +-
.../runtime/operators/RightOuterJoinDriver.java | 2 +-
.../sort/AbstractMergeOuterJoinIterator.java | 3 +-
.../sort/NonReusingMergeOuterJoinIterator.java | 1 +
.../sort/ReusingMergeOuterJoinIterator.java | 1 +
...bstractSortMergeOuterJoinIteratorITCase.java | 2 +-
...ReusingSortMergeOuterJoinIteratorITCase.java | 2 +-
...ReusingSortMergeOuterJoinIteratorITCase.java | 2 +-
11 files changed, 473 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
new file mode 100644
index 0000000..7666d10
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, OUT, FT> {
+
+ public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+ private OuterJoinType outerJoinType;
+
+ public OuterJoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) {
+ super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+ this.outerJoinType = outerJoinType;
+ }
+
+ public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) {
+ super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+ this.outerJoinType = outerJoinType;
+ }
+
+ public OuterJoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+ int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) {
+ super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+ this.outerJoinType = outerJoinType;
+ }
+
+ public void setOuterJoinType(OuterJoinType outerJoinType) {
+ this.outerJoinType = outerJoinType;
+ }
+
+ public OuterJoinType getOuterJoinType() {
+ return outerJoinType;
+ }
+
+ @Override
+ protected List<OUT> executeOnCollections(List<IN1> leftInput, List<IN2> rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception {
+ TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
+ TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
+ TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
+
+ TypeComparator<IN1> leftComparator = buildComparatorFor(0, executionConfig, leftInformation);
+ TypeComparator<IN2> rightComparator = buildComparatorFor(1, executionConfig, rightInformation);
+
+ TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
+ TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
+
+ OuterJoinListIterator<IN1, IN2> outerJoinIterator =
+ new OuterJoinListIterator<>(leftInput, leftSerializer, leftComparator,
+ rightInput, rightSerializer, rightComparator, outerJoinType);
+
+ // --------------------------------------------------------------------
+ // Run UDF
+ // --------------------------------------------------------------------
+ FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
+
+ FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
+ FunctionUtils.openFunction(function, this.parameters);
+
+
+ List<OUT> result = new ArrayList<>();
+ Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig));
+
+ while (outerJoinIterator.next()) {
+ IN1 left = outerJoinIterator.getLeft();
+ IN2 right = outerJoinIterator.getRight();
+ function.join(left == null ? null : leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), collector);
+ }
+
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> TypeComparator<T> buildComparatorFor(int input, ExecutionConfig executionConfig, TypeInformation<T> typeInformation) {
+ TypeComparator<T> comparator;
+ if (typeInformation instanceof AtomicType) {
+ comparator = ((AtomicType<T>) typeInformation).createComparator(true, executionConfig);
+ } else if (typeInformation instanceof CompositeType) {
+ int[] keyPositions = getKeyColumns(input);
+ boolean[] orders = new boolean[keyPositions.length];
+ Arrays.fill(orders, true);
+
+ comparator = ((CompositeType<T>) typeInformation).createComparator(keyPositions, orders, 0, executionConfig);
+ } else {
+ throw new RuntimeException("Type information for input of type " + typeInformation.getClass()
+ .getCanonicalName() + " is not supported. Could not generate a comparator.");
+ }
+ return comparator;
+ }
+
+ private static class OuterJoinListIterator<IN1, IN2> {
+
+
+ private static enum MatchStatus {
+ NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED, FIRST_EMPTY, SECOND_EMPTY
+ }
+
+ private OuterJoinType outerJoinType;
+
+ private ListKeyGroupedIterator<IN1> leftGroupedIterator;
+ private ListKeyGroupedIterator<IN2> rightGroupedIterator;
+ private Iterable<IN1> currLeftSubset;
+ private ResettableIterator currLeftIterator;
+ private Iterable<IN2> currRightSubset;
+ private ResettableIterator currRightIterator;
+
+ private MatchStatus matchStatus;
+ private GenericPairComparator<IN1, IN2> pairComparator;
+
+ private IN1 leftReturn;
+ private IN2 rightReturn;
+
+ public OuterJoinListIterator(List<IN1> leftInput, TypeSerializer<IN1> leftSerializer, final TypeComparator<IN1> leftComparator,
+ List<IN2> rightInput, TypeSerializer<IN2> rightSerializer, final TypeComparator<IN2> rightComparator,
+ OuterJoinType outerJoinType) {
+ this.outerJoinType = outerJoinType;
+ pairComparator = new GenericPairComparator<>(leftComparator, rightComparator);
+ leftGroupedIterator = new ListKeyGroupedIterator<>(leftInput, leftSerializer, leftComparator);
+ rightGroupedIterator = new ListKeyGroupedIterator<>(rightInput, rightSerializer, rightComparator);
+ // ----------------------------------------------------------------
+ // Sort
+ // ----------------------------------------------------------------
+ Collections.sort(leftInput, new Comparator<IN1>() {
+ @Override
+ public int compare(IN1 o1, IN1 o2) {
+ return leftComparator.compare(o1, o2);
+ }
+ });
+
+ Collections.sort(rightInput, new Comparator<IN2>() {
+ @Override
+ public int compare(IN2 o1, IN2 o2) {
+ return rightComparator.compare(o1, o2);
+ }
+ });
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private boolean next() throws IOException {
+ boolean hasMoreElements;
+ if ((currLeftIterator == null || !currLeftIterator.hasNext()) && (currRightIterator == null || !currRightIterator.hasNext())) {
+ hasMoreElements = nextGroups(outerJoinType);
+ if (hasMoreElements) {
+ if (outerJoinType != OuterJoinType.LEFT) {
+ currLeftIterator = new ListIteratorWrapper(currLeftSubset.iterator());
+ }
+ leftReturn = (IN1) currLeftIterator.next();
+ if (outerJoinType != OuterJoinType.RIGHT) {
+ currRightIterator = new ListIteratorWrapper(currRightSubset.iterator());
+ }
+ rightReturn = (IN2) currRightIterator.next();
+ return true;
+ } else {
+ //no more elements
+ return false;
+ }
+ } else if (currLeftIterator.hasNext() && !currRightIterator.hasNext()) {
+ leftReturn = (IN1) currLeftIterator.next();
+ currRightIterator.reset();
+ rightReturn = (IN2) currRightIterator.next();
+ return true;
+ } else {
+ rightReturn = (IN2) currRightIterator.next();
+ return true;
+ }
+ }
+
+ private boolean nextGroups(OuterJoinType outerJoinType) throws IOException {
+ if (outerJoinType == OuterJoinType.FULL) {
+ return nextGroups();
+ } else if (outerJoinType == OuterJoinType.LEFT) {
+ boolean leftContainsElements = false;
+ while (!leftContainsElements && nextGroups()) {
+ currLeftIterator = new ListIteratorWrapper(currLeftSubset.iterator());
+ if (currLeftIterator.next() != null) {
+ leftContainsElements = true;
+ }
+ currLeftIterator.reset();
+ }
+ return leftContainsElements;
+ } else if (outerJoinType == OuterJoinType.RIGHT) {
+ boolean rightContainsElements = false;
+ while (!rightContainsElements && nextGroups()) {
+ currRightIterator = new ListIteratorWrapper(currRightSubset.iterator());
+ if (currRightIterator.next() != null) {
+ rightContainsElements = true;
+ }
+ currRightIterator.reset();
+ }
+ return rightContainsElements;
+ } else {
+ throw new IllegalArgumentException("Outer join of type '" + outerJoinType + "' not supported.");
+ }
+ }
+
+ private boolean nextGroups() throws IOException {
+ boolean firstEmpty = true;
+ boolean secondEmpty = true;
+
+ if (this.matchStatus != MatchStatus.FIRST_EMPTY) {
+ if (this.matchStatus == MatchStatus.FIRST_REMAINED) {
+ // comparator is still set correctly
+ firstEmpty = false;
+ } else {
+ if (this.leftGroupedIterator.nextKey()) {
+ this.pairComparator.setReference(leftGroupedIterator.getValues().getCurrent());
+ firstEmpty = false;
+ }
+ }
+ }
+
+ if (this.matchStatus != MatchStatus.SECOND_EMPTY) {
+ if (this.matchStatus == MatchStatus.SECOND_REMAINED) {
+ secondEmpty = false;
+ } else {
+ if (rightGroupedIterator.nextKey()) {
+ secondEmpty = false;
+ }
+ }
+ }
+
+ if (firstEmpty && secondEmpty) {
+ // both inputs are empty
+ return false;
+ } else if (firstEmpty && !secondEmpty) {
+ // input1 is empty, input2 not
+ this.currLeftSubset = Collections.singleton(null);
+ this.currRightSubset = this.rightGroupedIterator.getValues();
+ this.matchStatus = MatchStatus.FIRST_EMPTY;
+ return true;
+ } else if (!firstEmpty && secondEmpty) {
+ // input1 is not empty, input 2 is empty
+ this.currLeftSubset = this.leftGroupedIterator.getValues();
+ this.currRightSubset = Collections.singleton(null);
+ this.matchStatus = MatchStatus.SECOND_EMPTY;
+ return true;
+ } else {
+ // both inputs are not empty
+ final int comp = this.pairComparator.compareToReference(rightGroupedIterator.getValues().getCurrent());
+
+ if (0 == comp) {
+ // keys match
+ this.currLeftSubset = this.leftGroupedIterator.getValues();
+ this.currRightSubset = this.rightGroupedIterator.getValues();
+ this.matchStatus = MatchStatus.NONE_REMAINED;
+ } else if (0 < comp) {
+ // key1 goes first
+ this.currLeftSubset = this.leftGroupedIterator.getValues();
+ this.currRightSubset = Collections.singleton(null);
+ this.matchStatus = MatchStatus.SECOND_REMAINED;
+ } else {
+ // key 2 goes first
+ this.currLeftSubset = Collections.singleton(null);
+ this.currRightSubset = this.rightGroupedIterator.getValues();
+ this.matchStatus = MatchStatus.FIRST_REMAINED;
+ }
+ return true;
+ }
+ }
+
+ private IN1 getLeft() {
+ return leftReturn;
+ }
+
+ private IN2 getRight() {
+ return rightReturn;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
new file mode 100644
index 0000000..679e4ce
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class OuterJoinOperatorBaseTest implements Serializable {
+
+ private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() {
+ @Override
+ public void join(String first, String second, Collector<String> out) throws Exception {
+ out.collect(Joiner.on(',').join(String.valueOf(first), String.valueOf(second)));
+ }
+ };
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator =
+ new OuterJoinOperatorBase(joiner,
+ new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+
+ @Test
+ public void testFullOuterJoinWithoutMatchingPartners() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+ final List<String> rightInput = Arrays.asList("oof", "rab", "raboof");
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+ List<String> expected = Arrays.asList("bar,null", "foo,null", "foobar,null", "null,oof", "null,rab", "null,raboof");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test
+ public void testFullOuterJoinWithFullMatchingKeys() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+ final List<String> rightInput = Arrays.asList("bar", "foobar", "foo");
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+ List<String> expected = Arrays.asList("bar,bar", "foo,foo", "foobar,foobar");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test
+ public void testFullOuterJoinWithEmptyLeftInput() throws Exception {
+ final List<String> leftInput = Arrays.asList();
+ final List<String> rightInput = Arrays.asList("foo", "bar", "foobar");
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+ List<String> expected = Arrays.asList("null,bar", "null,foo", "null,foobar");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test
+ public void testFullOuterJoinWithEmptyRightInput() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+ final List<String> rightInput = Arrays.asList();
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+ List<String> expected = Arrays.asList("bar,null", "foo,null", "foobar,null");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test
+ public void testFullOuterJoinWithPartialMatchingKeys() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+ final List<String> rightInput = Arrays.asList("bar", "foo", "barfoo");
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+ List<String> expected = Arrays.asList("bar,bar", "null,barfoo", "foo,foo", "foobar,null");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test
+ public void testFullOuterJoinBuildingCorrectCrossProducts() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar");
+ final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo");
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+ List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar",
+ "null,barfoo", "null,barfoo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo",
+ "foobar,null", "foobar,null");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar");
+ final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo");
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.LEFT);
+ List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar",
+ "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foobar,null", "foobar,null");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test
+ public void testRightOuterJoin() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar");
+ final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo");
+ baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.RIGHT);
+ List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar",
+ "null,barfoo", "null,barfoo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo");
+ testOuterJoin(leftInput, rightInput, expected);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testThatExceptionIsThrownForOuterJoinTypeNull() throws Exception {
+ final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+ final List<String> rightInput = Arrays.asList("bar", "foobar", "foo");
+
+ baseOperator.setOuterJoinType(null);
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+ baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+ }
+
+ private void testOuterJoin(List<String> leftInput, List<String> rightInput, List<String> expected) throws Exception {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.disableObjectReuse();
+ List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+ executionConfig.enableObjectReuse();
+ List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+
+ assertEquals(expected, resultSafe);
+ assertEquals(expected, resultRegular);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
index 30786aa..d942b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.operators;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
index 3cccab8..ae05d1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.operators;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
index c93637e..6fc8abd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.operators;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
import org.apache.flink.runtime.operators.util.JoinTaskIterator;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
index d109cf8..74faeb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators.sort;
import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -37,8 +38,6 @@ import java.util.Iterator;
*/
public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> {
- public enum OuterJoinType {LEFT, RIGHT, FULL}
-
private final OuterJoinType outerJoinType;
private boolean initialized = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
index db47f16..f2faa2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.sort;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
index 8382b86..33d72d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.sort;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 0c0e836..7b27fa9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.sort;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
import org.apache.flink.runtime.operators.testutils.CollectionIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
index 7272595..e930317 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.operators.sort;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
index 2cec393..cca1b76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.operators.sort;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Test;