You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/09 16:20:55 UTC

[6/6] flink git commit: [FLINK-2576] Add outer join base operator.

[FLINK-2576] Add outer join base operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b222276
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b222276
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b222276

Branch: refs/heads/master
Commit: 6b2222762fc38d84b31170216d6b6ae0c272af9b
Parents: 0455857
Author: r-pogalz <r....@campus.tu-berlin.de>
Authored: Tue Jul 7 21:40:04 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 9 16:19:21 2015 +0200

----------------------------------------------------------------------
 .../operators/base/OuterJoinOperatorBase.java   | 314 +++++++++++++++++++
 .../base/OuterJoinOperatorBaseTest.java         | 150 +++++++++
 .../runtime/operators/FullOuterJoinDriver.java  |   2 +-
 .../runtime/operators/LeftOuterJoinDriver.java  |   2 +-
 .../runtime/operators/RightOuterJoinDriver.java |   2 +-
 .../sort/AbstractMergeOuterJoinIterator.java    |   3 +-
 .../sort/NonReusingMergeOuterJoinIterator.java  |   1 +
 .../sort/ReusingMergeOuterJoinIterator.java     |   1 +
 ...bstractSortMergeOuterJoinIteratorITCase.java |   2 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |   2 +-
 ...ReusingSortMergeOuterJoinIteratorITCase.java |   2 +-
 11 files changed, 473 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
new file mode 100644
index 0000000..7666d10
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBase.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+import org.apache.commons.collections.ResettableIterator;
+import org.apache.commons.collections.iterators.ListIteratorWrapper;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.CopyingListCollector;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.util.ListKeyGroupedIterator;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OuterJoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN2, OUT>> extends AbstractJoinOperatorBase<IN1, IN2, OUT, FT> {
+
+	public static enum OuterJoinType {LEFT, RIGHT, FULL}
+
+	private OuterJoinType outerJoinType;
+
+	public OuterJoinOperatorBase(UserCodeWrapper<FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+			int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) {
+		super(udf, operatorInfo, keyPositions1, keyPositions2, name);
+		this.outerJoinType = outerJoinType;
+	}
+
+	public OuterJoinOperatorBase(FT udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+			int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) {
+		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+		this.outerJoinType = outerJoinType;
+	}
+
+	public OuterJoinOperatorBase(Class<? extends FT> udf, BinaryOperatorInformation<IN1, IN2, OUT> operatorInfo,
+			int[] keyPositions1, int[] keyPositions2, String name, OuterJoinType outerJoinType) {
+		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, keyPositions1, keyPositions2, name);
+		this.outerJoinType = outerJoinType;
+	}
+
+	public void setOuterJoinType(OuterJoinType outerJoinType) {
+		this.outerJoinType = outerJoinType;
+	}
+
+	public OuterJoinType getOuterJoinType() {
+		return outerJoinType;
+	}
+
+	@Override
+	protected List<OUT> executeOnCollections(List<IN1> leftInput, List<IN2> rightInput, RuntimeContext runtimeContext, ExecutionConfig executionConfig) throws Exception {
+		TypeInformation<IN1> leftInformation = getOperatorInfo().getFirstInputType();
+		TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
+		TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
+
+		TypeComparator<IN1> leftComparator = buildComparatorFor(0, executionConfig, leftInformation);
+		TypeComparator<IN2> rightComparator = buildComparatorFor(1, executionConfig, rightInformation);
+
+		TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
+		TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
+
+		OuterJoinListIterator<IN1, IN2> outerJoinIterator =
+				new OuterJoinListIterator<>(leftInput, leftSerializer, leftComparator,
+						rightInput, rightSerializer, rightComparator, outerJoinType);
+
+		// --------------------------------------------------------------------
+		// Run UDF
+		// --------------------------------------------------------------------
+		FlatJoinFunction<IN1, IN2, OUT> function = userFunction.getUserCodeObject();
+
+		FunctionUtils.setFunctionRuntimeContext(function, runtimeContext);
+		FunctionUtils.openFunction(function, this.parameters);
+
+
+		List<OUT> result = new ArrayList<>();
+		Collector<OUT> collector = new CopyingListCollector<>(result, outInformation.createSerializer(executionConfig));
+
+		while (outerJoinIterator.next()) {
+			IN1 left = outerJoinIterator.getLeft();
+			IN2 right = outerJoinIterator.getRight();
+			function.join(left == null ? null : leftSerializer.copy(left), right == null ? null : rightSerializer.copy(right), collector);
+		}
+
+		return result;
+	}
+
+	@SuppressWarnings("unchecked")
+	private <T> TypeComparator<T> buildComparatorFor(int input, ExecutionConfig executionConfig, TypeInformation<T> typeInformation) {
+		TypeComparator<T> comparator;
+		if (typeInformation instanceof AtomicType) {
+			comparator = ((AtomicType<T>) typeInformation).createComparator(true, executionConfig);
+		} else if (typeInformation instanceof CompositeType) {
+			int[] keyPositions = getKeyColumns(input);
+			boolean[] orders = new boolean[keyPositions.length];
+			Arrays.fill(orders, true);
+
+			comparator = ((CompositeType<T>) typeInformation).createComparator(keyPositions, orders, 0, executionConfig);
+		} else {
+			throw new RuntimeException("Type information for input of type " + typeInformation.getClass()
+					.getCanonicalName() + " is not supported. Could not generate a comparator.");
+		}
+		return comparator;
+	}
+
+	private static class OuterJoinListIterator<IN1, IN2> {
+
+
+		private static enum MatchStatus {
+			NONE_REMAINED, FIRST_REMAINED, SECOND_REMAINED, FIRST_EMPTY, SECOND_EMPTY
+		}
+
+		private OuterJoinType outerJoinType;
+
+		private ListKeyGroupedIterator<IN1> leftGroupedIterator;
+		private ListKeyGroupedIterator<IN2> rightGroupedIterator;
+		private Iterable<IN1> currLeftSubset;
+		private ResettableIterator currLeftIterator;
+		private Iterable<IN2> currRightSubset;
+		private ResettableIterator currRightIterator;
+
+		private MatchStatus matchStatus;
+		private GenericPairComparator<IN1, IN2> pairComparator;
+
+		private IN1 leftReturn;
+		private IN2 rightReturn;
+
+		public OuterJoinListIterator(List<IN1> leftInput, TypeSerializer<IN1> leftSerializer, final TypeComparator<IN1> leftComparator,
+				List<IN2> rightInput, TypeSerializer<IN2> rightSerializer, final TypeComparator<IN2> rightComparator,
+				OuterJoinType outerJoinType) {
+			this.outerJoinType = outerJoinType;
+			pairComparator = new GenericPairComparator<>(leftComparator, rightComparator);
+			leftGroupedIterator = new ListKeyGroupedIterator<>(leftInput, leftSerializer, leftComparator);
+			rightGroupedIterator = new ListKeyGroupedIterator<>(rightInput, rightSerializer, rightComparator);
+			// ----------------------------------------------------------------
+			// Sort
+			// ----------------------------------------------------------------
+			Collections.sort(leftInput, new Comparator<IN1>() {
+				@Override
+				public int compare(IN1 o1, IN1 o2) {
+					return leftComparator.compare(o1, o2);
+				}
+			});
+
+			Collections.sort(rightInput, new Comparator<IN2>() {
+				@Override
+				public int compare(IN2 o1, IN2 o2) {
+					return rightComparator.compare(o1, o2);
+				}
+			});
+
+		}
+
+		@SuppressWarnings("unchecked")
+		private boolean next() throws IOException {
+			boolean hasMoreElements;
+			if ((currLeftIterator == null || !currLeftIterator.hasNext()) && (currRightIterator == null || !currRightIterator.hasNext())) {
+				hasMoreElements = nextGroups(outerJoinType);
+				if (hasMoreElements) {
+					if (outerJoinType != OuterJoinType.LEFT) {
+						currLeftIterator = new ListIteratorWrapper(currLeftSubset.iterator());
+					}
+					leftReturn = (IN1) currLeftIterator.next();
+					if (outerJoinType != OuterJoinType.RIGHT) {
+						currRightIterator = new ListIteratorWrapper(currRightSubset.iterator());
+					}
+					rightReturn = (IN2) currRightIterator.next();
+					return true;
+				} else {
+					//no more elements
+					return false;
+				}
+			} else if (currLeftIterator.hasNext() && !currRightIterator.hasNext()) {
+				leftReturn = (IN1) currLeftIterator.next();
+				currRightIterator.reset();
+				rightReturn = (IN2) currRightIterator.next();
+				return true;
+			} else {
+				rightReturn = (IN2) currRightIterator.next();
+				return true;
+			}
+		}
+
+		private boolean nextGroups(OuterJoinType outerJoinType) throws IOException {
+			if (outerJoinType == OuterJoinType.FULL) {
+				return nextGroups();
+			} else if (outerJoinType == OuterJoinType.LEFT) {
+				boolean leftContainsElements = false;
+				while (!leftContainsElements && nextGroups()) {
+					currLeftIterator = new ListIteratorWrapper(currLeftSubset.iterator());
+					if (currLeftIterator.next() != null) {
+						leftContainsElements = true;
+					}
+					currLeftIterator.reset();
+				}
+				return leftContainsElements;
+			} else if (outerJoinType == OuterJoinType.RIGHT) {
+				boolean rightContainsElements = false;
+				while (!rightContainsElements && nextGroups()) {
+					currRightIterator = new ListIteratorWrapper(currRightSubset.iterator());
+					if (currRightIterator.next() != null) {
+						rightContainsElements = true;
+					}
+					currRightIterator.reset();
+				}
+				return rightContainsElements;
+			} else {
+				throw new IllegalArgumentException("Outer join of type '" + outerJoinType + "' not supported.");
+			}
+		}
+
+		private boolean nextGroups() throws IOException {
+			boolean firstEmpty = true;
+			boolean secondEmpty = true;
+
+			if (this.matchStatus != MatchStatus.FIRST_EMPTY) {
+				if (this.matchStatus == MatchStatus.FIRST_REMAINED) {
+					// comparator is still set correctly
+					firstEmpty = false;
+				} else {
+					if (this.leftGroupedIterator.nextKey()) {
+						this.pairComparator.setReference(leftGroupedIterator.getValues().getCurrent());
+						firstEmpty = false;
+					}
+				}
+			}
+
+			if (this.matchStatus != MatchStatus.SECOND_EMPTY) {
+				if (this.matchStatus == MatchStatus.SECOND_REMAINED) {
+					secondEmpty = false;
+				} else {
+					if (rightGroupedIterator.nextKey()) {
+						secondEmpty = false;
+					}
+				}
+			}
+
+			if (firstEmpty && secondEmpty) {
+				// both inputs are empty
+				return false;
+			} else if (firstEmpty && !secondEmpty) {
+				// input1 is empty, input2 not
+				this.currLeftSubset = Collections.singleton(null);
+				this.currRightSubset = this.rightGroupedIterator.getValues();
+				this.matchStatus = MatchStatus.FIRST_EMPTY;
+				return true;
+			} else if (!firstEmpty && secondEmpty) {
+				// input1 is not empty, input 2 is empty
+				this.currLeftSubset = this.leftGroupedIterator.getValues();
+				this.currRightSubset = Collections.singleton(null);
+				this.matchStatus = MatchStatus.SECOND_EMPTY;
+				return true;
+			} else {
+				// both inputs are not empty
+				final int comp = this.pairComparator.compareToReference(rightGroupedIterator.getValues().getCurrent());
+
+				if (0 == comp) {
+					// keys match
+					this.currLeftSubset = this.leftGroupedIterator.getValues();
+					this.currRightSubset = this.rightGroupedIterator.getValues();
+					this.matchStatus = MatchStatus.NONE_REMAINED;
+				} else if (0 < comp) {
+					// key1 goes first
+					this.currLeftSubset = this.leftGroupedIterator.getValues();
+					this.currRightSubset = Collections.singleton(null);
+					this.matchStatus = MatchStatus.SECOND_REMAINED;
+				} else {
+					// key 2 goes first
+					this.currLeftSubset = Collections.singleton(null);
+					this.currRightSubset = this.rightGroupedIterator.getValues();
+					this.matchStatus = MatchStatus.FIRST_REMAINED;
+				}
+				return true;
+			}
+		}
+
+		private IN1 getLeft() {
+			return leftReturn;
+		}
+
+		private IN2 getRight() {
+			return rightReturn;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
new file mode 100644
index 0000000..679e4ce
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators.base;
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class OuterJoinOperatorBaseTest implements Serializable {
+
+	private final FlatJoinFunction<String, String, String> joiner = new FlatJoinFunction<String, String, String>() {
+		@Override
+		public void join(String first, String second, Collector<String> out) throws Exception {
+			out.collect(Joiner.on(',').join(String.valueOf(first), String.valueOf(second)));
+		}
+	};
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	private final OuterJoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>> baseOperator =
+			new OuterJoinOperatorBase(joiner,
+					new BinaryOperatorInformation(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
+							BasicTypeInfo.STRING_TYPE_INFO), new int[0], new int[0], "TestJoiner", null);
+
+	@Test
+	public void testFullOuterJoinWithoutMatchingPartners() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+		final List<String> rightInput = Arrays.asList("oof", "rab", "raboof");
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+		List<String> expected = Arrays.asList("bar,null", "foo,null", "foobar,null", "null,oof", "null,rab", "null,raboof");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test
+	public void testFullOuterJoinWithFullMatchingKeys() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+		final List<String> rightInput = Arrays.asList("bar", "foobar", "foo");
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+		List<String> expected = Arrays.asList("bar,bar", "foo,foo", "foobar,foobar");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test
+	public void testFullOuterJoinWithEmptyLeftInput() throws Exception {
+		final List<String> leftInput = Arrays.asList();
+		final List<String> rightInput = Arrays.asList("foo", "bar", "foobar");
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+		List<String> expected = Arrays.asList("null,bar", "null,foo", "null,foobar");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test
+	public void testFullOuterJoinWithEmptyRightInput() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+		final List<String> rightInput = Arrays.asList();
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+		List<String> expected = Arrays.asList("bar,null", "foo,null", "foobar,null");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test
+	public void testFullOuterJoinWithPartialMatchingKeys() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+		final List<String> rightInput = Arrays.asList("bar", "foo", "barfoo");
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+		List<String> expected = Arrays.asList("bar,bar", "null,barfoo", "foo,foo", "foobar,null");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test
+	public void testFullOuterJoinBuildingCorrectCrossProducts() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar");
+		final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo");
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
+		List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar",
+				"null,barfoo", "null,barfoo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo",
+				"foobar,null", "foobar,null");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test
+	public void testLeftOuterJoin() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar");
+		final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo");
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.LEFT);
+		List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar",
+				"foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foobar,null", "foobar,null");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test
+	public void testRightOuterJoin() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "foo", "foo", "bar","bar", "foobar", "foobar");
+		final List<String> rightInput = Arrays.asList("foo", "foo", "bar", "bar", "bar", "barfoo", "barfoo");
+		baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.RIGHT);
+		List<String> expected = Arrays.asList("bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar", "bar,bar",
+				"null,barfoo", "null,barfoo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo", "foo,foo");
+		testOuterJoin(leftInput, rightInput, expected);
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testThatExceptionIsThrownForOuterJoinTypeNull() throws Exception {
+		final List<String> leftInput = Arrays.asList("foo", "bar", "foobar");
+		final List<String> rightInput = Arrays.asList("bar", "foobar", "foo");
+
+		baseOperator.setOuterJoinType(null);
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.disableObjectReuse();
+		baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+	}
+
+	private void testOuterJoin(List<String> leftInput, List<String> rightInput, List<String> expected) throws Exception {
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.disableObjectReuse();
+		List<String> resultSafe = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+		executionConfig.enableObjectReuse();
+		List<String> resultRegular = baseOperator.executeOnCollections(leftInput, rightInput, null, executionConfig);
+
+		assertEquals(expected, resultSafe);
+		assertEquals(expected, resultRegular);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
index 30786aa..d942b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
index 3cccab8..ae05d1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
index c93637e..6fc8abd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator;
 import org.apache.flink.runtime.operators.util.JoinTaskIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
index d109cf8..74faeb3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AbstractMergeOuterJoinIterator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.sort;
 
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -37,8 +38,6 @@ import java.util.Iterator;
  */
 public abstract class AbstractMergeOuterJoinIterator<T1, T2, O> extends AbstractMergeIterator<T1, T2, O> {
 
-	public enum OuterJoinType {LEFT, RIGHT, FULL}
-
 	private final OuterJoinType outerJoinType;
 
 	private boolean initialized = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
index db47f16..f2faa2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NonReusingMergeOuterJoinIterator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
index 8382b86..33d72d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/ReusingMergeOuterJoinIterator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 0c0e836..7b27fa9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.sort;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.runtime.operators.testutils.CollectionIterator;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
index 7272595..e930317 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeOuterJoinIteratorITCase.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b222276/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
index 2cec393..cca1b76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeOuterJoinIteratorITCase.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.runtime.operators.sort;
 
+import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase.OuterJoinType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AbstractMergeOuterJoinIterator.OuterJoinType;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;