You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/02 17:33:29 UTC
[1/2] flink git commit: [FLINK-1569] Disable object-reuse for
collection execution
Repository: flink
Updated Branches:
refs/heads/master 84c11e690 -> bbe6e8af7
[FLINK-1569] Disable object-reuse for collection execution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbe6e8af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbe6e8af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbe6e8af
Branch: refs/heads/master
Commit: bbe6e8af7867b7c32d8769be31dad8c5d9235f49
Parents: 9ff9410
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 2 16:06:24 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 2 16:07:48 2015 +0100
----------------------------------------------------------------------
.../common/operators/CollectionExecutor.java | 11 +-----
.../common/operators/GenericDataSourceBase.java | 4 +-
.../operators/base/CoGroupOperatorBase.java | 16 +++-----
.../operators/base/CrossOperatorBase.java | 33 ++++++----------
.../operators/base/FilterOperatorBase.java | 2 +-
.../operators/base/FlatMapOperatorBase.java | 26 ++++---------
.../operators/base/GroupReduceOperatorBase.java | 41 +++++++-------------
.../common/operators/base/JoinOperatorBase.java | 16 ++------
.../common/operators/base/MapOperatorBase.java | 24 ++++--------
.../base/MapPartitionOperatorBase.java | 22 ++++-------
.../operators/base/ReduceOperatorBase.java | 36 +++++------------
.../operators/util/ListKeyGroupedIterator.java | 6 +--
12 files changed, 74 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index afccd7c..2f9ae9a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -69,18 +69,11 @@ public class CollectionExecutor {
private final ClassLoader classLoader;
- private final boolean mutableObjectSafeMode;
-
private final ExecutionConfig executionConfig;
-
+
// --------------------------------------------------------------------------------------------
public CollectionExecutor(ExecutionConfig executionConfig) {
- this(executionConfig, DEFAULT_MUTABLE_OBJECT_SAFE_MODE);
- }
-
- public CollectionExecutor(ExecutionConfig executionConfig, boolean mutableObjectSafeMode) {
- this.mutableObjectSafeMode = mutableObjectSafeMode;
this.executionConfig = executionConfig;
this.intermediateResults = new HashMap<Operator<?>, List<?>>();
@@ -171,7 +164,7 @@ public class CollectionExecutor {
private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source) throws Exception {
@SuppressWarnings("unchecked")
GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
- return typedSource.executeOnCollections(executionConfig, mutableObjectSafeMode);
+ return typedSource.executeOnCollections(executionConfig);
}
private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> operator, int superStep) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index 912d13d..05babf4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -203,7 +203,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
// --------------------------------------------------------------------------------------------
- protected List<OUT> executeOnCollections(ExecutionConfig executionConfig, boolean mutableObjectSafe) throws Exception {
+ protected List<OUT> executeOnCollections(ExecutionConfig executionConfig) throws Exception {
@SuppressWarnings("unchecked")
InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
inputFormat.configure(this.parameters);
@@ -220,7 +220,7 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O
while (!inputFormat.reachedEnd()) {
OUT next = inputFormat.nextRecord(serializer.createInstance());
if (next != null) {
- result.add(mutableObjectSafe ? serializer.copy(next) : next);
+ result.add(serializer.copy(next));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index 4165f3d..dbebeb4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.Ordering;
@@ -198,8 +197,6 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
int[] inputKeys1 = getKeyColumns(0);
int[] inputKeys2 = getKeyColumns(1);
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
boolean[] inputDirections1 = new boolean[inputKeys1.length];
boolean[] inputDirections2 = new boolean[inputKeys2.length];
Arrays.fill(inputDirections1, true);
@@ -254,7 +251,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
CoGroupSortListIterator<IN1, IN2> coGroupIterator =
new CoGroupSortListIterator<IN1, IN2>(input1, inputSortComparator1, inputComparator1, inputSerializer1,
- input2, inputSortComparator2, inputComparator2, inputSerializer2, objectReuseDisabled);
+ input2, inputSortComparator2, inputComparator2, inputSerializer2);
// --------------------------------------------------------------------
// Run UDF
@@ -265,9 +262,7 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
FunctionUtils.openFunction(function, parameters);
List<OUT> result = new ArrayList<OUT>();
- Collector<OUT> resultCollector = objectReuseDisabled ?
- new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer(executionConfig)) :
- new ListCollector<OUT>(result);
+ Collector<OUT> resultCollector = new CopyingListCollector<OUT>(result, getOperatorInfo().getOutputType().createSerializer(executionConfig));
while (coGroupIterator.next()) {
function.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), resultCollector);
@@ -306,13 +301,12 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
private CoGroupSortListIterator(
List<IN1> input1, final TypeComparator<IN1> inputSortComparator1, TypeComparator<IN1> inputComparator1, TypeSerializer<IN1> serializer1,
- List<IN2> input2, final TypeComparator<IN2> inputSortComparator2, TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2,
- boolean copyElements)
+ List<IN2> input2, final TypeComparator<IN2> inputSortComparator2, TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2)
{
this.pairComparator = new GenericPairComparator<IN1, IN2>(inputComparator1, inputComparator2);
- this.iterator1 = new ListKeyGroupedIterator<IN1>(input1, serializer1, inputComparator1, copyElements);
- this.iterator2 = new ListKeyGroupedIterator<IN2>(input2, serializer2, inputComparator2, copyElements);
+ this.iterator1 = new ListKeyGroupedIterator<IN1>(input1, serializer1, inputComparator1);
+ this.iterator2 = new ListKeyGroupedIterator<IN2>(input2, serializer2, inputComparator2);
// ----------------------------------------------------------------
// Sort
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
index f20659c..f2c75a5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CrossOperatorBase.java
@@ -91,32 +91,21 @@ public class CrossOperatorBase<IN1, IN2, OUT, FT extends CrossFunction<IN1, IN2,
FunctionUtils.setFunctionRuntimeContext(function, ctx);
FunctionUtils.openFunction(function, this.parameters);
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
ArrayList<OUT> result = new ArrayList<OUT>(inputData1.size() * inputData2.size());
- if (objectReuseDisabled) {
- TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
- TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-
- for (IN1 element1 : inputData1) {
- for (IN2 element2 : inputData2) {
- IN1 copy1 = inSerializer1.copy(element1);
- IN2 copy2 = inSerializer2.copy(element2);
- OUT o = function.cross(copy1, copy2);
- result.add(outSerializer.copy(o));
- }
- }
- }
- else {
- for (IN1 element1 : inputData1) {
- for (IN2 element2 : inputData2) {
- result.add(function.cross(element1, element2));
- }
+ TypeSerializer<IN1> inSerializer1 = getOperatorInfo().getFirstInputType().createSerializer(executionConfig);
+ TypeSerializer<IN2> inSerializer2 = getOperatorInfo().getSecondInputType().createSerializer(executionConfig);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+ for (IN1 element1 : inputData1) {
+ for (IN2 element2 : inputData2) {
+ IN1 copy1 = inSerializer1.copy(element1);
+ IN2 copy2 = inSerializer2.copy(element2);
+ OUT o = function.cross(copy1, copy2);
+ result.add(outSerializer.copy(o));
}
}
-
+
FunctionUtils.closeFunction(function);
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
index 4db5265..8d4fce5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FilterOperatorBase.java
@@ -59,7 +59,7 @@ public class FilterOperatorBase<T, FT extends FlatMapFunction<T, T>> extends Sin
ArrayList<T> result = new ArrayList<T>(inputData.size());
ListCollector<T> collector = new ListCollector<T>(result);
-
+
for (T element : inputData) {
function.flatMap(element, collector);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
index 615ba87..a356249 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FlatMapOperatorBase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -62,23 +61,14 @@ public class FlatMapOperatorBase<IN, OUT, FT extends FlatMapFunction<IN, OUT>> e
ArrayList<OUT> result = new ArrayList<OUT>(input.size());
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
- if (objectReuseDisabled) {
- TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-
- CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
-
- for (IN element : input) {
- IN inCopy = inSerializer.copy(element);
- function.flatMap(inCopy, resultCollector);
- }
- } else {
- ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
- for (IN element : input) {
- function.flatMap(element, resultCollector);
- }
+ TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+ CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
+
+ for (IN element : input) {
+ IN inCopy = inSerializer.copy(element);
+ function.flatMap(inCopy, resultCollector);
}
FunctionUtils.closeFunction(function);
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
index f4f7d0f..6d7db89 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
@@ -156,8 +155,6 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
GroupReduceFunction<IN, OUT> function = this.userFunction.getUserCodeObject();
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
UnaryOperatorInformation<IN, OUT> operatorInfo = getOperatorInfo();
TypeInformation<IN> inputType = operatorInfo.getInputType();
@@ -196,39 +193,27 @@ public class GroupReduceOperatorBase<IN, OUT, FT extends GroupReduceFunction<IN,
ArrayList<OUT> result = new ArrayList<OUT>();
if (keyColumns.length == 0) {
- if (objectReuseDisabled) {
- final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
- List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
- for (IN in: inputData) {
- inputDataCopy.add(inputSerializer.copy(in));
- }
- CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
-
- function.reduce(inputDataCopy, collector);
- } else {
- ListCollector<OUT> collector = new ListCollector<OUT>(result);
- function.reduce(inputData, collector);
+ final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+ List<IN> inputDataCopy = new ArrayList<IN>(inputData.size());
+ for (IN in: inputData) {
+ inputDataCopy.add(inputSerializer.copy(in));
}
+ CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+
+ function.reduce(inputDataCopy, collector);
} else {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer(executionConfig);
boolean[] keyOrderings = new boolean[keyColumns.length];
final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0, executionConfig);
- ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, objectReuseDisabled);
+ ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator);
- if (objectReuseDisabled) {
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
- CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+ CopyingListCollector<OUT> collector = new CopyingListCollector<OUT>(result, outSerializer);
- while (keyedIterator.nextKey()) {
- function.reduce(keyedIterator.getValues(), collector);
- }
- } else {
- ListCollector<OUT> collector = new ListCollector<OUT>(result);
- while (keyedIterator.nextKey()) {
- function.reduce(keyedIterator.getValues(), collector);
- }
+ while (keyedIterator.nextKey()) {
+ function.reduce(keyedIterator.getValues(), collector);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
index 373846f..7f05c8f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/JoinOperatorBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -150,10 +149,8 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
TypeInformation<IN2> rightInformation = getOperatorInfo().getSecondInputType();
TypeInformation<OUT> outInformation = getOperatorInfo().getOutputType();
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
- TypeSerializer<IN1> leftSerializer = objectReuseDisabled ? leftInformation.createSerializer(executionConfig) : null;
- TypeSerializer<IN2> rightSerializer = objectReuseDisabled ? rightInformation.createSerializer(executionConfig) : null;
+ TypeSerializer<IN1> leftSerializer = leftInformation.createSerializer(executionConfig);
+ TypeSerializer<IN2> rightSerializer = rightInformation.createSerializer(executionConfig);
TypeComparator<IN1> leftComparator;
TypeComparator<IN2> rightComparator;
@@ -191,8 +188,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
TypePairComparator<IN1, IN2> pairComparator = new GenericPairComparator<IN1, IN2>(leftComparator, rightComparator);
List<OUT> result = new ArrayList<OUT>();
- Collector<OUT> collector = objectReuseDisabled ? new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig))
- : new ListCollector<OUT>(result);
+ Collector<OUT> collector = new CopyingListCollector<OUT>(result, outInformation.createSerializer(executionConfig));
Map<Integer, List<IN2>> probeTable = new HashMap<Integer, List<IN2>>();
@@ -215,11 +211,7 @@ public class JoinOperatorBase<IN1, IN2, OUT, FT extends FlatJoinFunction<IN1, IN
pairComparator.setReference(left);
for (IN2 right : matchingHashes) {
if (pairComparator.equalToReference(right)) {
- if (objectReuseDisabled) {
- function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
- } else {
- function.join(left, right, collector);
- }
+ function.join(leftSerializer.copy(left), rightSerializer.copy(right), collector);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
index cde3b74..033f476 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapOperatorBase.java
@@ -63,23 +63,15 @@ public class MapOperatorBase<IN, OUT, FT extends MapFunction<IN, OUT>> extends S
ArrayList<OUT> result = new ArrayList<OUT>(inputData.size());
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
- if (objectReuseDisabled) {
- TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-
- for (IN element : inputData) {
- IN inCopy = inSerializer.copy(element);
- OUT out = function.map(inCopy);
- result.add(outSerializer.copy(out));
- }
- } else {
- for (IN element : inputData) {
- result.add(function.map(element));
- }
+ TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+ for (IN element : inputData) {
+ IN inCopy = inSerializer.copy(element);
+ OUT out = function.map(inCopy);
+ result.add(outSerializer.copy(out));
}
-
+
FunctionUtils.closeFunction(function);
return result;
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
index 25b3bb8..81c2ad4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/MapPartitionOperatorBase.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.CopyingIterator;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
@@ -66,20 +65,13 @@ public class MapPartitionOperatorBase<IN, OUT, FT extends MapPartitionFunction<I
ArrayList<OUT> result = new ArrayList<OUT>(inputData.size() / 4);
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
-
- if (objectReuseDisabled) {
- TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
- TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
-
- CopyingIterator<IN> source = new CopyingIterator<IN>(inputData.iterator(), inSerializer);
- CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
-
- function.mapPartition(source, resultCollector);
- } else {
- ListCollector<OUT> resultCollector = new ListCollector<OUT>(result);
- function.mapPartition(inputData, resultCollector);
- }
+ TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
+ TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
+
+ CopyingIterator<IN> source = new CopyingIterator<IN>(inputData.iterator(), inSerializer);
+ CopyingListCollector<OUT> resultCollector = new CopyingListCollector<OUT>(result, outSerializer);
+
+ function.mapPartition(source, resultCollector);
result.trimToSize();
FunctionUtils.closeFunction(function);
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index d3d61e9..e843ee6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -156,7 +156,6 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
return Collections.emptyList();
}
- boolean objectReuseDisabled = !executionConfig.isObjectReuseEnabled();
ReduceFunction<T> function = this.userFunction.getUserCodeObject();
UnaryOperatorInformation<T, T> operatorInfo = getOperatorInfo();
@@ -185,22 +184,14 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
T existing = aggregateMap.get(wrapper);
T result;
- if (objectReuseDisabled) {
- if (existing != null) {
- result = function.reduce(existing, serializer.copy(next));
- } else {
- result = next;
- }
-
- result = serializer.copy(result);
+ if (existing != null) {
+ result = function.reduce(existing, serializer.copy(next));
} else {
- if (existing != null) {
- result = function.reduce(existing, next);
- } else {
- result = next;
- }
+ result = next;
}
+ result = serializer.copy(result);
+
aggregateMap.put(wrapper, result);
}
@@ -211,18 +202,11 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
else {
T aggregate = inputData.get(0);
- if (objectReuseDisabled) {
- aggregate = serializer.copy(aggregate);
-
- for (int i = 1; i < inputData.size(); i++) {
- T next = function.reduce(aggregate, serializer.copy(inputData.get(i)));
- aggregate = serializer.copy(next);
- }
- }
- else {
- for (int i = 1; i < inputData.size(); i++) {
- aggregate = function.reduce(aggregate, inputData.get(i));
- }
+ aggregate = serializer.copy(aggregate);
+
+ for (int i = 1; i < inputData.size(); i++) {
+ T next = function.reduce(aggregate, serializer.copy(inputData.get(i)));
+ aggregate = serializer.copy(next);
}
FunctionUtils.setFunctionRuntimeContext(function, ctx);
http://git-wip-us.apache.org/repos/asf/flink/blob/bbe6e8af/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
index 673440d..bb40c98 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/ListKeyGroupedIterator.java
@@ -51,13 +51,13 @@ public final class ListKeyGroupedIterator<E> {
* @param input The list with the input elements.
* @param comparator The comparator for the data type iterated over.
*/
- public ListKeyGroupedIterator(List<E> input, TypeSerializer<E> serializer, TypeComparator<E> comparator, boolean copy) {
+ public ListKeyGroupedIterator(List<E> input, TypeSerializer<E> serializer, TypeComparator<E> comparator) {
if (input == null || comparator == null) {
throw new NullPointerException();
}
this.input = input;
- this.serializer = copy ? serializer : null;
+ this.serializer = serializer;
this.comparator = comparator;
this.done = input.isEmpty() ? true : false;
@@ -176,7 +176,7 @@ public final class ListKeyGroupedIterator<E> {
if (this.next != null) {
E current = this.next;
this.next = ListKeyGroupedIterator.this.advanceToNext();
- return serializer != null ? serializer.copy(current) : current;
+ return serializer.copy(current);
} else {
throw new NoSuchElementException();
}
[2/2] flink git commit: [hotfix] Fix Scala type analysis for classes
that extend Collections
Posted by al...@apache.org.
[hotfix] Fix Scala type analysis for classes that extend Collections
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ff94105
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ff94105
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ff94105
Branch: refs/heads/master
Commit: 9ff941052f6af78ee1a88f86ae5dd21b47e2dbe1
Parents: 84c11e6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 2 15:29:50 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 2 16:07:48 2015 +0100
----------------------------------------------------------------------
.../flink/api/scala/codegen/TypeAnalyzer.scala | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9ff94105/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
index 3121815..541ba20 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeAnalyzer.scala
@@ -18,6 +18,7 @@
package org.apache.flink.api.scala.codegen
import scala.collection._
+import scala.collection.generic.CanBuildFrom
import scala.reflect.macros.Context
import scala.util.DynamicVariable
@@ -288,7 +289,22 @@ private[flink] trait TypeAnalyzer[C <: Context] { this: MacroContextHolder[C]
traversable match {
case TypeRef(_, _, elemTpe :: Nil) =>
- Some(elemTpe.asSeenFrom(tpe, tpe.typeSymbol))
+
+ // determine whether we can find an implicit for the CanBuildFrom because
+ // TypeInformationGen requires this. This catches the case where a user
+ // has a custom class that implements Iterable[], for example.
+ val cbfTpe = TypeRef(
+ typeOf[CanBuildFrom[_, _, _]],
+ typeOf[CanBuildFrom[_, _, _]].typeSymbol,
+ tpe :: elemTpe :: tpe :: Nil)
+
+ val cbf = c.inferImplicitValue(cbfTpe, silent = true)
+
+ if (cbf == EmptyTree) {
+ None
+ } else {
+ Some(elemTpe.asSeenFrom(tpe, tpe.typeSymbol))
+ }
case _ => None
}