You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/08 16:00:56 UTC

flink git commit: [FLINK-2054] Add object-reuse switch for streaming

Repository: flink
Updated Branches:
  refs/heads/master 7805db813 -> 26304c20a


[FLINK-2054] Add object-reuse switch for streaming

The switch was already there: enableObjectReuse() in ExecutionConfig.
This was simply not considered by the streaming runtime. This change now
draws a copy before forwarding an element to a chained operator when
object reuse is disabled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26304c20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26304c20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26304c20

Branch: refs/heads/master
Commit: 26304c20ab6aab33daba775736061102bd7a2409
Parents: 7805db8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 8 11:32:27 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 8 16:00:05 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    | 20 +++++++++++
 .../api/datastream/DiscretizedStream.java       | 12 ++++---
 .../api/operators/AbstractStreamOperator.java   | 15 +++++++++
 .../streaming/api/operators/StreamOperator.java |  6 ++++
 .../operators/windowing/StreamWindowBuffer.java |  1 +
 .../operators/windowing/WindowFlattener.java    |  1 +
 .../api/operators/windowing/WindowFolder.java   |  1 +
 .../api/operators/windowing/WindowMapper.java   |  1 +
 .../api/operators/windowing/WindowMerger.java   |  2 +-
 .../operators/windowing/WindowPartitioner.java  |  1 +
 .../api/operators/windowing/WindowReducer.java  |  1 +
 .../streaming/runtime/tasks/OutputHandler.java  | 35 ++++++++++++++++++--
 12 files changed, 87 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index 4c7178e..5f63d2c 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -270,6 +270,26 @@ The `DataStream` is the basic data abstraction provided by the Flink Streaming.
 
 The transformations may return different data stream types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped transformations such as aggregating by key. We will discover more elaborate data stream types in the upcoming sections.
 
+### Object Reuse Behavior
+
+Apache Flink is trying to reduce the number of object allocations for better performance.
+
+By default, user defined functions (like `map()` or `reduce()`) are getting new objects on each call
+(or through an iterator). So it is possible to keep references to the objects inside the function
+(for example in a List).
+
+There is a switch at the `ExectionConfig` which allows users to enable the object reuse mode:
+
+```
+env.getExecutionConfig().enableObjectReuse()
+```
+
+For mutable types, Flink will reuse object
+instances. In practice that means that a `map()` function will always receive the same object
+instance (with its fields set to new values). The object reuse mode will lead to better performance
+because fewer objects are created, but the user has to manually take care of what they are doing
+with the object references.
+
 ### Partitioning
 
 Partitioning controls how individual data points of a stream are distributed among the parallel instances of the transformation operators. This also controls the ordering of the records in the `DataStream`. There is partial ordering guarantee for the outputs with respect to the partitioning scheme (outputs produced from each partition are guaranteed to arrive in the order they were produced).

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 4083bb8..ba28fa4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -198,16 +198,18 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	}
 
 	private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
-		return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(),
-				new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>())), input.isPartitioned);
+		StreamFilter<StreamWindow<OUT>> emptyFilter = new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>());
+		emptyFilter.disableInputCopy();
+		return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(), emptyFilter), input.isPartitioned);
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
+		StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>> partExtractor = new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
+				new WindowPartExtractor<OUT>());
+		partExtractor.disableInputCopy();
 		return input.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class,
-				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
-				new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
-						new WindowPartExtractor<OUT>()));
+				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), partExtractor);
 	}
 
 	private DiscretizedStream<OUT> partition(WindowTransformation transformation) {

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7bb7780..a365587 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -37,6 +37,8 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 
 	public transient Output<OUT> output;
 
+	protected boolean inputCopyDisabled = false;
+
 	// A sane default for most operators
 	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
 
@@ -64,4 +66,17 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 	public final ChainingStrategy getChainingStrategy() {
 		return chainingStrategy;
 	}
+
+	@Override
+	public boolean isInputCopyingDisabled() {
+		return inputCopyDisabled;
+	}
+
+	/**
+	 * Enable object-reuse for this operator instance. This overrides the setting in
+	 * the {@link org.apache.flink.api.common.ExecutionConfig}/
+	 */
+	public void disableInputCopy() {
+		this.inputCopyDisabled = true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 43ab2ac..05b15be 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -49,6 +49,12 @@ public interface StreamOperator<OUT> extends Serializable {
 	 */
 	public void close() throws Exception;
 
+	/**
+	 * An operator can return true here to disable copying of its input elements. This overrides
+	 * the object-reuse setting on the {@link org.apache.flink.api.common.ExecutionConfig}
+	 */
+	public boolean isInputCopyingDisabled();
+
 	public void setChainingStrategy(ChainingStrategy strategy);
 
 	public ChainingStrategy getChainingStrategy();

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
index f890b69..074ff4b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
@@ -37,6 +37,7 @@ public class StreamWindowBuffer<T>
 	public StreamWindowBuffer(WindowBuffer<T> buffer) {
 		this.buffer = buffer;
 		setChainingStrategy(ChainingStrategy.FORCE_ALWAYS);
+		disableInputCopy();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
index 3afc50f..159b6f8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
@@ -32,6 +32,7 @@ public class WindowFlattener<T> extends AbstractStreamOperator<T>
 
 	public WindowFlattener() {
 		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
+		disableInputCopy();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
index bdf6782..b8f407a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java
@@ -39,6 +39,7 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 	public WindowFolder(FoldFunction<IN, OUT> folder, OUT initialValue) {
 		super(new WindowFoldFunction<IN, OUT>(folder, initialValue));
 		this.folder = folder;
+		disableInputCopy();
 	}
 
 	private static class WindowFoldFunction<IN, OUT> extends AbstractRichFunction implements

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
index fb2a35c..18a237d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java
@@ -39,6 +39,7 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
 	public WindowMapper(WindowMapFunction<IN, OUT> mapper) {
 		super(new WindowMap<IN, OUT>(mapper));
 		this.mapper = mapper;
+		disableInputCopy();
 	}
 
 	private static class WindowMap<T, R> extends AbstractRichFunction

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index bb8cfaa..93a92f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -38,8 +38,8 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
 
 	public WindowMerger() {
 		this.windows = new HashMap<Integer, StreamWindow<T>>();
-
 		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
+		disableInputCopy();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
index b86caaa..6b10c16 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
@@ -38,6 +38,7 @@ public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>
 		this.keySelector = keySelector;
 
 		chainingStrategy = ChainingStrategy.FORCE_ALWAYS;
+		disableInputCopy();
 	}
 
 	public WindowPartitioner(int numberOfSplits) {

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
index b6d079e..ff88bab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java
@@ -39,6 +39,7 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
 	public WindowReducer(ReduceFunction<IN> reducer) {
 		super(new WindowReduceFunction<IN>(reducer));
 		this.reducer = reducer;
+		disableInputCopy();
 	}
 
 	private static class WindowReduceFunction<T> extends AbstractRichFunction implements

http://git-wip-us.apache.org/repos/asf/flink/blob/26304c20/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 2094d31..effce6c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -157,7 +158,13 @@ public class OutputHandler<OUT> {
 			chainableOperator.setup(wrapper, chainedContext);
 
 			chainedOperators.add(chainableOperator);
-			return new OperatorCollector<X>(chainableOperator);
+			if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
+				return new OperatorCollector<X>(chainableOperator);
+			} else {
+				return new CopyingOperatorCollector<X>(
+						chainableOperator,
+						(TypeSerializer<X>) chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()).getObjectSerializer());
+			}
 		}
 
 	}
@@ -219,7 +226,7 @@ public class OutputHandler<OUT> {
 	}
 
 	private static class OperatorCollector<T> implements Output<T> {
-		private OneInputStreamOperator operator;
+		protected OneInputStreamOperator operator;
 
 		public OperatorCollector(OneInputStreamOperator<?, T> operator) {
 			this.operator = operator;
@@ -239,7 +246,7 @@ public class OutputHandler<OUT> {
 		}
 
 		@Override
-		public void close() {
+		public final void close() {
 			try {
 				operator.close();
 			} catch (Exception e) {
@@ -249,4 +256,26 @@ public class OutputHandler<OUT> {
 			}
 		}
 	}
+
+	private static class CopyingOperatorCollector<T> extends OperatorCollector<T> {
+		private final TypeSerializer<T> serializer;
+
+		public CopyingOperatorCollector(OneInputStreamOperator<?, T> operator, TypeSerializer<T> serializer) {
+			super(operator);
+			this.serializer = serializer;
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public void collect(T record) {
+			try {
+				operator.processElement(serializer.copy(record));
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Could not forward element to operator.", e);
+				}
+				throw new RuntimeException(e);
+			}
+		}
+	}
 }