You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/02 17:33:29 UTC

[1/2] flink git commit: [FLINK-1569] Disable object-reuse for collection execution

Repository: flink
Updated Branches:
  refs/heads/master 84c11e690 -> bbe6e8af7


[FLINK-1569] Disable object-reuse for collection execution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbe6e8af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbe6e8af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbe6e8af

Branch: refs/heads/master
Commit: bbe6e8af7867b7c32d8769be31dad8c5d9235f49
Parents: 9ff9410
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 2 16:06:24 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 2 16:07:48 2015 +0100

----------------------------------------------------------------------
 .../common/operators/CollectionExecutor.java    | 11 +-----
 .../common/operators/GenericDataSourceBase.java |  4 +-
 .../operators/base/CoGroupOperatorBase.java     | 16 +++-----
 .../operators/base/CrossOperatorBase.java       | 33 ++++++----------
 .../operators/base/FilterOperatorBase.java      |  2 +-
 .../operators/base/FlatMapOperatorBase.java     | 26 ++++---------
 .../operators/base/GroupReduceOperatorBase.java | 41 +++++++-------------
 .../common/operators/base/JoinOperatorBase.java | 16 ++------
 .../common/operators/base/MapOperatorBase.java  | 24 ++++--------
 .../base/MapPartitionOperatorBase.java          | 22 ++++-------
 .../operators/base/ReduceOperatorBase.java      | 36 +++++------------
 .../operators/util/ListKeyGroupedIterator.java  |  6 +--
 12 files changed, 74 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index afccd7c..2f9ae9a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -69,18 +69,11 @@ public class CollectionExecutor {
 	
 	private final ClassLoader classLoader;
 	
-	private final boolean mutableObjectSafeMode;
-
 	private final ExecutionConfig executionConfig;
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	public CollectionExecutor(ExecutionConfig executionConfig) {
-		this(executionConfig, DEFAULT_MUTABLE_OBJECT_SAFE_MODE);
-	}
-		
-	public CollectionExecutor(ExecutionConfig executionConfig, boolean mutableObjectSafeMode) {
-		this.mutableObjectSafeMode = mutableObjectSafeMode;
 		this.executionConfig = executionConfig;
 		
 		this.intermediateResults = new HashMap<Operator<?>, List<?>>();
@@ -171,7 +164,7 @@ public class CollectionExecutor {
 	private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source) throws Exception {
 		@SuppressWarnings("unchecked")
 		GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
-		return typedSource.executeOnCollections(executionConfig, mutableObjectSafeMode);
+		return typedSource.executeOnCollections(executionConfig);
 	}
 	
 	private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> operator, int superStep) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index 912d13d..05babf4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -203,7 +203,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 	
 	// --------------------------------------------------------------------------------------------
 	
-	protected List<OUT> executeOnCollections(ExecutionConfig executionConfig, boolean mutableObjectSafe) throws Exception {
+	protected List<OUT> executeOnCollections(ExecutionConfig executionConfig) throws Exception {
 		@SuppressWarnings("unchecked")
 		InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
 		inputFormat.configure(this.parameters);
@@ -220,7 +220,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
 			while (!inputFormat.reachedEnd()) {
 				OUT next = inputFormat.nextRecord(serializer.createInstance());
 				if (next != null) {
-					result.add(mutableObjectSafe ? serializer.copy(next) : next);
+					result.add(serializer.copy(next));
 				}
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index 4165f3d..dbebeb4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.Ordering;
@@ -198,8 +197,6 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		int[] inputKeys1 = getKeyColumns(0);
 		int[] inputKeys2 = getKeyColumns(1);
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-		
 		boolean[] inputDirections1 = new boolean[inputKeys1.length];
 		boolean[] inputDirections2 = new boolean[inputKeys2.length];
 		Arrays.fill(inputDirections1, true);
@@ -254,7 +251,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 
 		CoGroupSortListIterator<IN1, IN2> coGroupIterator =
 				new CoGroupSortListIterator<IN1, IN2>(input1, inputSortComparator1, inputComparator1, inputSerializer1,
-						input2, inputSortComparator2, inputComparator2, inputSerializer2, objectReuseDisabled);
+						input2, inputSortComparator2, inputComparator2, inputSerializer2);
 
 		// --------------------------------------------------------------------
 		// Run UDF
@@ -265,9 +262,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		FunctionUtils.openFunction(function, parameters);
 
 		List<OUT> result = new ArrayList<OUT>();
-		Collector<OUT> resultCollector = objectReuseDisabled ?
-				new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer(executionConfig)) :
-				new ListCollector<OUT>(result);
+		Collector<OUT> resultCollector = new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer(executionConfig));
 
 		while (coGroupIterator.next()) {
 			function.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), resultCollector);
@@ -306,13 +301,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 
 		private CoGroupSortListIterator(
 				List<IN1> input1, final TypeComparator<IN1> inputSortComparator1, TypeComparator<IN1> inputComparator1, TypeSerializer<IN1> serializer1,
-				List<IN2> input2, final TypeComparator<IN2> inputSortComparator2, TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2,
-				boolean copyElements)
+				List<IN2> input2, final TypeComparator<IN2> inputSortComparator2, TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2)
 		{
 			this.pairComparator = new GenericPairComparator<IN1, IN2>(inputComparator1, inputComparator2);
 
-			this.iterator1 = new ListKeyGroupedIterator<IN1>(input1, serializer1, inputComparator1, copyElements);
-			this.iterator2 = new ListKeyGroupedIterator<IN2>(input2, serializer2, inputComparator2, copyElements);
+			this.iterator1 = new ListKeyGroupedIterator<IN1>(input1, serializer1, inputComparator1);
+			this.iterator2 = new ListKeyGroupedIterator<IN2>(input2, serializer2, inputComparator2);
 
 			// ----------------------------------------------------------------
 			// Sort

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index f20659c..f2c75a5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -91,32 +91,21 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2,
 		FunctionUtils.setFunctionRuntimeContext(function, ctx);
 		FunctionUtils.openFunction(function, this.parameters);
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
 		
-		if (objectReuseDisabled) {
-			TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
-			TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-			
-			for (IN1 element1 : inputData1) {
-				for (IN2 element2 : inputData2) {
-					IN1 copy1 = inSerializer1.copy(element1);
-					IN2 copy2 = inSerializer2.copy(element2);
-					OUT o = function.cross(copy1, copy2);
-					result.add(outSerializer.copy(o));
-				}
-			}
-		}
-		else {
-			for (IN1 element1 : inputData1) {
-				for (IN2 element2 : inputData2) {
-					result.add(function.cross(element1, element2));
-				}
+		TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
+		TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
+		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+		for (IN1 element1 : inputData1) {
+			for (IN2 element2 : inputData2) {
+				IN1 copy1 = inSerializer1.copy(element1);
+				IN2 copy2 = inSerializer2.copy(element2);
+				OUT o = function.cross(copy1, copy2);
+				result.add(outSerializer.copy(o));
 			}
 		}
-		
+
 		FunctionUtils.closeFunction(function);
 		return result;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
index 4db5265..8d4fce5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
@@ -59,7 +59,7 @@ public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends Sin
 		
 		ArrayList<T> result = new ArrayList<T>(inputData.size());
 		ListCollector<T> collector = new ListCollector<T>(result);
-		
+
 		for (T element : inputData) {
 			function.flatMap(element, collector);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
index 615ba87..a356249 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -62,23 +61,14 @@ public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> e
 
 		ArrayList<OUT> result = new ArrayList<OUT>(input.size());
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-		
-		if (objectReuseDisabled) {
-			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-			
-			CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
-			
-			for (IN element : input) {
-				IN inCopy = inSerializer.copy(element);
-				function.flatMap(inCopy, resultCollector);
-			}
-		} else {
-			ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
-			for (IN element : input) {
-				function.flatMap(element, resultCollector);
-			}
+		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
+
+		for (IN element : input) {
+			IN inCopy = inSerializer.copy(element);
+			function.flatMap(inCopy, resultCollector);
 		}
 
 		FunctionUtils.closeFunction(function);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index f4f7d0f..6d7db89 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -156,8 +155,6 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
 		GroupReduceFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
 		UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
 		TypeInformation<IN> inputType = operatorInfo.getInputType();
 
@@ -196,39 +193,27 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
 		ArrayList<OUT> result = new ArrayList<OUT>();
 
 		if (keyColumns.length == 0) {
-			if (objectReuseDisabled) {
-				final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
-				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-				List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
-				for (IN in: inputData) {
-					inputDataCopy.add(inputSerializer.copy(in));
-				}
-				CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
-
-				function.reduce(inputDataCopy, collector);
-			} else {
-				ListCollector<OUT> collector = new ListCollector<OUT>(result);
-				function.reduce(inputData, collector);
+			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+			List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
+			for (IN in: inputData) {
+				inputDataCopy.add(inputSerializer.copy(in));
 			}
+			CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+
+			function.reduce(inputDataCopy, collector);
 		} else {
 			final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
 			boolean[] keyOrderings = new boolean[keyColumns.length];
 			final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
 
-			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, objectReuseDisabled);
+			ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
 
-			if (objectReuseDisabled) {
-				TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-				CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+			CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
 
-				while (keyedIterator.nextKey()) {
-					function.reduce(keyedIterator.getValues(), collector);
-				}
-			} else {
-				ListCollector<OUT> collector = new ListCollector<OUT>(result);
-				while (keyedIterator.nextKey()) {
-					function.reduce(keyedIterator.getValues(), collector);
-				}
+			while (keyedIterator.nextKey()) {
+				function.reduce(keyedIterator.getValues(), collector);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 373846f..7f05c8f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputOperator;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -150,10 +149,8 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
 		TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-		
-		TypeSerializer<IN1> leftSerializer = objectReuseDisabled ? leftInformation.createSerializer(executionConfig) : null;
-		TypeSerializer<IN2> rightSerializer = objectReuseDisabled ? rightInformation.createSerializer(executionConfig) : null;
+		TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
+		TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
 		
 		TypeComparator<IN1> leftComparator;
 		TypeComparator<IN2> rightComparator;
@@ -191,8 +188,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 		TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
 
 		List<OUT> result = new ArrayList<OUT>();
-		Collector<OUT> collector = objectReuseDisabled ? new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig))
-														: new ListCollector<OUT>(result);
+		Collector<OUT> collector = new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig));
 
 		Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
 
@@ -215,11 +211,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
 				pairComparator.setReference(left);
 				for (IN2 right : matchingHashes) {
 					if (pairComparator.equalToReference(right)) {
-						if (objectReuseDisabled) {
-							function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
-						} else {
-							function.join(left, right, collector);
-						}
+						function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index cde3b74..033f476 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -63,23 +63,15 @@ public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends S
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-		
-		if (objectReuseDisabled) {
-			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-			
-			for (IN element : inputData) {
-				IN inCopy = inSerializer.copy(element);
-				OUT out = function.map(inCopy);
-				result.add(outSerializer.copy(out));
-			}
-		} else {
-			for (IN element : inputData) {
-				result.add(function.map(element));
-			}
+		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+		for (IN element : inputData) {
+			IN inCopy = inSerializer.copy(element);
+			OUT out = function.map(inCopy);
+			result.add(outSerializer.copy(out));
 		}
-		
+
 		FunctionUtils.closeFunction(function);
 		
 		return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
index 25b3bb8..81c2ad4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.CopyingIterator;
 import org.apache.flink.api.common.functions.util.CopyingListCollector;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -66,20 +65,13 @@ public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<I
 		
 		ArrayList<OUT> result = new ArrayList<OUT>(inputData.size() / 4);
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-		
-		if (objectReuseDisabled) {
-			TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
-			TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-			
-			CopyingIterator<IN> source = new CopyingIterator<IN>(inputData.iterator(), inSerializer);
-			CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
-			
-			function.mapPartition(source, resultCollector);
-		} else {
-			ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
-			function.mapPartition(inputData, resultCollector);
-		}
+		TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+		TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+		CopyingIterator<IN> source = new CopyingIterator<IN>(inputData.iterator(), inSerializer);
+		CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
+
+		function.mapPartition(source, resultCollector);
 
 		result.trimToSize();
 		FunctionUtils.closeFunction(function);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index d3d61e9..e843ee6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -156,7 +156,6 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 			return Collections.emptyList();
 		}
 
-		boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
 		ReduceFunction<T> function = this.userFunction.getUserCodeObject();
 
 		UnaryOperatorInformation<T, T> operatorInfo = getOperatorInfo();
@@ -185,22 +184,14 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 				T existing = aggregateMap.get(wrapper);
 				T result;
 
-				if (objectReuseDisabled) {
-					if (existing != null) {
-						result = function.reduce(existing, serializer.copy(next));
-					} else {
-						result = next;
-					}
-
-					result = serializer.copy(result);
+				if (existing != null) {
+					result = function.reduce(existing, serializer.copy(next));
 				} else {
-					if (existing != null) {
-						result = function.reduce(existing, next);
-					} else {
-						result = next;
-					}
+					result = next;
 				}
 
+				result = serializer.copy(result);
+
 				aggregateMap.put(wrapper, result);
 
 			}
@@ -211,18 +202,11 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 		else {
 			T aggregate = inputData.get(0);
 			
-			if (objectReuseDisabled) {
-				aggregate = serializer.copy(aggregate);
-				
-				for (int i = 1; i < inputData.size(); i++) {
-					T next = function.reduce(aggregate, serializer.copy(inputData.get(i)));
-					aggregate = serializer.copy(next);
-				}
-			}
-			else {
-				for (int i = 1; i < inputData.size(); i++) {
-					aggregate = function.reduce(aggregate, inputData.get(i));
-				}
+			aggregate = serializer.copy(aggregate);
+
+			for (int i = 1; i < inputData.size(); i++) {
+				T next = function.reduce(aggregate, serializer.copy(inputData.get(i)));
+				aggregate = serializer.copy(next);
 			}
 
 			FunctionUtils.setFunctionRuntimeContext(function, ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
index 673440d..bb40c98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
@@ -51,13 +51,13 @@ public final class ListKeyGroupedIterator<E> {
 	 * @param input The list with the input elements.
 	 * @param comparator The comparator for the data type iterated over.
 	 */
-	public ListKeyGroupedIterator(List<E> input, TypeSerializer<E> serializer, TypeComparator<E> comparator, boolean copy) {
+	public ListKeyGroupedIterator(List<E> input, TypeSerializer<E> serializer, TypeComparator<E> comparator) {
 		if (input == null || comparator == null) {
 			throw new NullPointerException();
 		}
 
 		this.input = input;
-		this.serializer = copy ? serializer : null;
+		this.serializer = serializer;
 		this.comparator = comparator;
 
 		this.done = input.isEmpty() ? true : false;
@@ -176,7 +176,7 @@ public final class ListKeyGroupedIterator<E> {
 			if (this.next != null) {
 				E current = this.next;
 				this.next = ListKeyGroupedIterator.this.advanceToNext();
-				return serializer != null ? serializer.copy(current) : current;
+				return serializer.copy(current);
 			} else {
 				throw new NoSuchElementException();
 			}


[2/2] flink git commit: [hotfix] Fix Scala type analysis for classes that extend Collections

Posted by al...@apache.org.
[hotfix] Fix Scala type analysis for classes that extend Collections


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ff94105
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ff94105
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ff94105

Branch: refs/heads/master
Commit: 9ff941052f6af78ee1a88f86ae5dd21b47e2dbe1
Parents: 84c11e6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 2 15:29:50 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 2 16:07:48 2015 +0100

----------------------------------------------------------------------
 .../flink/api/scala/codegen/TypeAnalyzer.scala    | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ff94105/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index 3121815..541ba20 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.api.scala.codegen
 
 import scala.collection._
+import scala.collection.generic.CanBuildFrom
 import scala.reflect.macros.Context
 import scala.util.DynamicVariable
 
@@ -288,7 +289,22 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
 
           traversable match {
             case TypeRef(_, _, elemTpe :: Nil) =>
-              Some(elemTpe.asSeenFrom(tpe, tpe.typeSymbol))
+
+              // determine whether we can find an implicit for the CanBuildFrom because
+              // TypeInformationGen requires this. This catches the case where a user
+              // has a custom class that implements Iterable[], for example.
+              val cbfTpe = TypeRef(
+                typeOf[CanBuildFrom[_, _, _]],
+                typeOf[CanBuildFrom[_, _, _]].typeSymbol,
+                tpe :: elemTpe :: tpe :: Nil)
+
+              val cbf = c.inferImplicitValue(cbfTpe, silent = true)
+
+              if (cbf == EmptyTree) {
+                None
+              } else {
+                Some(elemTpe.asSeenFrom(tpe, tpe.typeSymbol))
+              }
             case _ => None
           }