You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/08 16:00:56 UTC
flink git commit: [FLINK-2054] Add object-reuse switch for streaming
Repository: flink
Updated Branches:
refs/heads/master 7805db813 -> 26304c20a
[FLINK-2054] Add object-reuse switch for streaming
The switch was already there: enableObjectReuse() in ExecutionConfig.
This was simply not considered by the streaming runtime. This change now
draws a copy before forwarding an element to a chained operator when
object reuse is disabled.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26304c20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26304c20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26304c20
Branch: refs/heads/master
Commit: 26304c20ab6aab33daba775736061102bd7a2409
Parents: 7805db8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 8 11:32:27 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 8 16:00:05 2015 +0200
----------------------------------------------------------------------
docs/apis/streaming_guide.md | 20 +++++++++++
.../api/datastream/DiscretizedStream.java | 12 ++++---
.../api/operators/AbstractStreamOperator.java | 15 +++++++++
.../streaming/api/operators/StreamOperator.java | 6 ++++
.../operators/windowing/StreamWindowBuffer.java | 1 +
.../operators/windowing/WindowFlattener.java | 1 +
.../api/operators/windowing/WindowFolder.java | 1 +
.../api/operators/windowing/WindowMapper.java | 1 +
.../api/operators/windowing/WindowMerger.java | 2 +-
.../operators/windowing/WindowPartitioner.java | 1 +
.../api/operators/windowing/WindowReducer.java | 1 +
.../streaming/runtime/tasks/OutputHandler.java | 35 ++++++++++++++++++--
12 files changed, 87 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 4c7178e..5f63d2c 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -270,6 +270,26 @@ The `DataStream` is the basic data abstraction provided by the Flink Streaming.
The transformations may return different data stream types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped transformations such as aggregating by key. We will discover more elaborate data stream types in the upcoming sections.
+### Object Reuse Behavior
+
+Apache Flink is trying to reduce the number of object allocations for better performance.
+
+By default, user defined functions (like `map()` or `reduce()`) are getting new objects on each call
+(or through an iterator). So it is possible to keep references to the objects inside the function
+(for example in a List).
+
+There is a switch at the `ExectionConfig` which allows users to enable the object reuse mode:
+
+```
+env.getExecutionConfig().enableObjectReuse()
+```
+
+For mutable types, Flink will reuse object
+instances. In practice that means that a `map()` function will always receive the same object
+instance (with its fields set to new values). The object reuse mode will lead to better performance
+because fewer objects are created, but the user has to manually take care of what they are doing
+with the object references.
+
### Partitioning
Partitioning controls how individual data points of a stream are distributed among the parallel instances of the transformation operators. This also controls the ordering of the records in the `DataStream`. There is partial ordering guarantee for the outputs with respect to the partitioning scheme (outputs produced from each partition are guaranteed to arrive in the order they were produced).
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 4083bb8..ba28fa4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -198,16 +198,18 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
}
private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
- return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(),
- new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>())), input.isPartitioned);
+ StreamFilter<StreamWindow<OUT>> emptyFilter = new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>());
+ emptyFilter.disableInputCopy();
+ return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(), emptyFilter), input.isPartitioned);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
+ StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>> partExtractor = new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
+ new WindowPartExtractor<OUT>());
+ partExtractor.disableInputCopy();
return input.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class,
- BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
- new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
- new WindowPartExtractor<OUT>()));
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), partExtractor);
}
private DiscretizedStream<OUT> partition(WindowTransformation transformation) {
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7bb7780..a365587 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -37,6 +37,8 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
public transient Output<OUT> output;
+ protected boolean inputCopyDisabled = false;
+
// A sane default for most operators
protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
@@ -64,4 +66,17 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
public final ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
+
+ @Override
+ public boolean isInputCopyingDisabled() {
+ return inputCopyDisabled;
+ }
+
+ /**
+ * Enable object-reuse for this operator instance. This overrides the setting in
+ * the {@link org.apache.flink.api.common.ExecutionConfig}/
+ */
+ public void disableInputCopy() {
+ this.inputCopyDisabled = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 43ab2ac..05b15be 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -49,6 +49,12 @@ public interface StreamOperator<OUT> extends Serializable {
*/
public void close() throws Exception;
+ /**
+ * An operator can return true here to disable copying of its input elements. This overrides
+ * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
+ */
+ public boolean isInputCopyingDisabled();
+
public void setChainingStrategy(ChainingStrategy strategy);
public ChainingStrategy getChainingStrategy();
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
index f890b69..074ff4b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
@@ -37,6 +37,7 @@ public class StreamWindowBuffer<T>
public StreamWindowBuffer(WindowBuffer<T> buffer) {
this.buffer = buffer;
setChainingStrategy(ChainingStrategy.FORCE_ALWAYS);
+ disableInputCopy();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
index 3afc50f..159b6f8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
@@ -32,6 +32,7 @@ public class WindowFlattener<T> extends AbstractStreamOperator<T>
public WindowFlattener() {
chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
+ disableInputCopy();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index bdf6782..b8f407a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -39,6 +39,7 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
this.folder = folder;
+ disableInputCopy();
}
private static class WindowFoldFunction<IN, OUT> extends AbstractRichFunction implements
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
index fb2a35c..18a237d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
@@ -39,6 +39,7 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
super(new WindowMap<IN, OUT>(mapper));
this.mapper = mapper;
+ disableInputCopy();
}
private static class WindowMap<T, R> extends AbstractRichFunction
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index bb8cfaa..93a92f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -38,8 +38,8 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
public WindowMerger() {
this.windows = new HashMap<Integer, StreamWindow<T>>();
-
chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
+ disableInputCopy();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
index b86caaa..6b10c16 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
@@ -38,6 +38,7 @@ public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>
this.keySelector = keySelector;
chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
+ disableInputCopy();
}
public WindowPartitioner(int numberOfSplits) {
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
index b6d079e..ff88bab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
@@ -39,6 +39,7 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
public WindowReducer(ReduceFunction<IN> reducer) {
super(new WindowReduceFunction<IN>(reducer));
this.reducer = reducer;
+ disableInputCopy();
}
private static class WindowReduceFunction<T> extends AbstractRichFunction implements
http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 2094d31..effce6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -157,7 +158,13 @@ public class OutputHandler<OUT> {
chainableOperator.setup(wrapper, chainedContext);
chainedOperators.add(chainableOperator);
- return new OperatorCollector<X>(chainableOperator);
+ if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
+ return new OperatorCollector<X>(chainableOperator);
+ } else {
+ return new CopyingOperatorCollector<X>(
+ chainableOperator,
+ (TypeSerializer<X>) chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()).getObjectSerializer());
+ }
}
}
@@ -219,7 +226,7 @@ public class OutputHandler<OUT> {
}
private static class OperatorCollector<T> implements Output<T> {
- private OneInputStreamOperator operator;
+ protected OneInputStreamOperator operator;
public OperatorCollector(OneInputStreamOperator<?, T> operator) {
this.operator = operator;
@@ -239,7 +246,7 @@ public class OutputHandler<OUT> {
}
@Override
- public void close() {
+ public final void close() {
try {
operator.close();
} catch (Exception e) {
@@ -249,4 +256,26 @@ public class OutputHandler<OUT> {
}
}
}
+
+ private static class CopyingOperatorCollector<T> extends OperatorCollector<T> {
+ private final TypeSerializer<T> serializer;
+
+ public CopyingOperatorCollector(OneInputStreamOperator<?, T> operator, TypeSerializer<T> serializer) {
+ super(operator);
+ this.serializer = serializer;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void collect(T record) {
+ try {
+ operator.processElement(serializer.copy(record));
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Could not forward element to operator.", e);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+ }
}