You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 13:41:41 UTC
[09/10] flink git commit: [FLINK-1618] [streaming] Parallel time
reduce
[FLINK-1618] [streaming] Parallel time reduce
Closes #485
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1377ca97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1377ca97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1377ca97
Branch: refs/heads/master
Commit: 1377ca97dd0d8d1bbb7224f562dfc4f68226e02b
Parents: 221e5e6
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Mar 16 09:52:41 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 11:25:04 2015 +0100
----------------------------------------------------------------------
.../api/StreamingJobGraphGenerator.java | 28 +--
.../api/datastream/DiscretizedStream.java | 91 +++++++-
.../api/datastream/WindowedDataStream.java | 94 ++++++--
.../api/invokable/ChainableInvokable.java | 10 +
.../streaming/api/invokable/SinkInvokable.java | 2 +-
.../api/invokable/operator/FilterInvokable.java | 2 +-
.../invokable/operator/FlatMapInvokable.java | 2 +-
.../api/invokable/operator/MapInvokable.java | 2 +-
.../operator/StreamReduceInvokable.java | 2 +-
.../operator/windowing/EmptyWindowFilter.java | 32 +++
.../windowing/ParallelGroupedMerge.java | 41 ++++
.../operator/windowing/ParallelMerge.java | 142 +++++++++++
.../windowing/ParallelWindowPartitioner.java | 84 -------
.../operator/windowing/StreamDiscretizer.java | 13 +-
.../windowing/WindowBufferInvokable.java | 3 +-
.../operator/windowing/WindowMapper.java | 1 +
.../operator/windowing/WindowPartExtractor.java | 55 +++++
.../operator/windowing/WindowPartitioner.java | 6 +-
.../streaming/api/windowing/StreamWindow.java | 39 ++--
.../api/windowing/StreamWindowSerializer.java | 33 +--
.../streaming/api/windowing/WindowUtils.java | 7 +-
.../windowbuffer/BasicWindowBuffer.java | 13 +-
.../windowbuffer/CompletePreAggregator.java | 27 ---
.../windowing/windowbuffer/PreAggregator.java | 27 +++
.../windowbuffer/SlidingPreReducer.java | 16 +-
.../windowbuffer/TumblingGroupedPreReducer.java | 16 +-
.../windowbuffer/TumblingPreReducer.java | 17 +-
.../windowing/windowbuffer/WindowBuffer.java | 33 ++-
.../operator/windowing/ParallelMergeTest.java | 119 ++++++++++
.../windowing/WindowIntegrationTest.java | 234 +++++++++++++------
.../operator/windowing/WindowMergerTest.java | 6 +-
.../windowing/WindowPartitionerTest.java | 8 +-
.../api/windowing/StreamWindowTest.java | 8 +-
.../TumblingGroupedPreReducerTest.java | 26 +++
34 files changed, 915 insertions(+), 324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index 8a110bf..0146448 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -194,10 +194,12 @@ public class StreamingJobGraphGenerator {
for (StreamEdge chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable.getTargetVertex()));
}
- String returnOperatorName = operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
+ String returnOperatorName = operatorName + " -> ("
+ + StringUtils.join(outputChainedNames, ", ") + ")";
return returnOperatorName;
} else if (chainedOutputs.size() == 1) {
- String returnOperatorName = operatorName + " -> " + chainedNames.get(chainedOutputs.get(0).getTargetVertex());
+ String returnOperatorName = operatorName + " -> "
+ + chainedNames.get(chainedOutputs.get(0).getTargetVertex());
return returnOperatorName;
} else {
return operatorName;
@@ -215,8 +217,7 @@ public class StreamingJobGraphGenerator {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexID),
- vertexID);
+ LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexID), vertexID);
}
if (streamGraph.getInputFormat(vertexID) != null) {
@@ -263,7 +264,8 @@ public class StreamingJobGraphGenerator {
allOutputs.addAll(nonChainableOutputs);
for (StreamEdge output : allOutputs) {
- config.setSelectedNames(output.getTargetVertex(), streamGraph.getEdge(vertexID, output.getTargetVertex()).getSelectedNames());
+ config.setSelectedNames(output.getTargetVertex(),
+ streamGraph.getEdge(vertexID, output.getTargetVertex()).getSelectedNames());
}
vertexConfigs.put(vertexID, config);
@@ -302,15 +304,15 @@ public class StreamingJobGraphGenerator {
StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexID);
StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName);
- return
- streamGraph.getInEdges(outName).size() == 1
- && outInvokable != null
- && outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS
- && (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable
+ return streamGraph.getInEdges(outName).size() == 1
+ && outInvokable != null
+ && outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS
+ && (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable
.getChainingStrategy() == ChainingStrategy.ALWAYS)
- && edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD
- && streamGraph.getParallelism(vertexID) == streamGraph.getParallelism(outName)
- && streamGraph.chaining;
+ && (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || streamGraph
+ .getParallelism(outName) == 1)
+ && streamGraph.getParallelism(vertexID) == streamGraph.getParallelism(outName)
+ && streamGraph.chaining;
}
private void setSlotSharing() {
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 6526aa6..7597b47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -18,21 +18,31 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.WindowMapFunction;
+import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.windowing.EmptyWindowFilter;
+import org.apache.flink.streaming.api.invokable.operator.windowing.ParallelGroupedMerge;
+import org.apache.flink.streaming.api.invokable.operator.windowing.ParallelMerge;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFolder;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
+import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartExtractor;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
@@ -91,6 +101,56 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
}
}
+ /**
+ * This method implements the parallel time reduce logic for time windows
+ *
+ * @param reduceFunction
+ * The reduce function to be applied on the windows
+ * @param isPreAggregated
+ * Flag whether the window buffer was a pre-aggregator or not
+ * @return
+ */
+ protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction,
+ boolean isPreAggregated) {
+
+ // We partition the windowed stream if it is not already pre-aggregated
+ DiscretizedStream<OUT> partitioned = isPreAggregated ? this : partition(transformation);
+
+ // Since we also emit the empty windows for bookkeeping, we need to
+ // filter them out
+ DiscretizedStream<OUT> nonEmpty = filterEmpty(partitioned);
+
+ // We extract the number of parts from each window we will merge using
+ // this afterwards
+ DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(partitioned);
+
+ // We reduce the windows if not pre-aggregated
+ DiscretizedStream<OUT> reduced = isPreAggregated ? nonEmpty : nonEmpty.transform(
+ WindowTransformation.REDUCEWINDOW, "Window Reduce", nonEmpty.getType(),
+ new WindowReducer<OUT>(reduceFunction));
+
+ // We merge the windows by the number of parts
+ return wrap(parallelMerge(numOfParts, reduced, reduceFunction), false);
+
+ }
+
+ private SingleOutputStreamOperator<StreamWindow<OUT>, ?> parallelMerge(
+ DataStream<Tuple2<Integer, Integer>> numOfParts, DiscretizedStream<OUT> reduced,
+ ReduceFunction<OUT> reduceFunction) {
+
+ CoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
+ : new ParallelMerge<OUT>(reduceFunction);
+
+ return reduced.discretizedStream
+ .groupBy(new WindowKey<OUT>())
+ .connect(numOfParts.groupBy(0))
+ .addCoFunction(
+ "CoFlatMap",
+ reduced.discretizedStream.getType(),
+ new CoFlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>>(
+ parallelMerger));
+ }
+
@Override
public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
@@ -128,6 +188,23 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
invokable), transformation);
}
+ private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
+ return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(),
+ new FilterInvokable<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>())
+ .withoutInputCopy()), input.isPartitioned);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
+ return input.discretizedStream
+ .transform(
+ "ExtractParts",
+ new TupleTypeInfo(Tuple2.class, BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO),
+ new FlatMapInvokable<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
+ new WindowPartExtractor<OUT>()).withoutInputCopy());
+ }
+
private DiscretizedStream<OUT> partition(WindowTransformation transformation) {
int parallelism = discretizedStream.getParallelism();
@@ -173,13 +250,15 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
}
- @SuppressWarnings("rawtypes")
- private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream, boolean isPartitioned) {
- return wrap(stream, transformation);
+ @SuppressWarnings("unchecked")
+ private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
+ boolean isPartitioned) {
+ return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
+ transformation, isPartitioned);
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream,
+ @SuppressWarnings("unchecked")
+ private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
WindowTransformation transformation) {
return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey,
transformation, isPartitioned);
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index c80546f..73cbdfd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -58,7 +58,7 @@ import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
+import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
@@ -262,17 +262,25 @@ public class WindowedDataStream<OUT> {
*/
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
- WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
- .with(clean(reduceFunction));
+ // We check whether we should apply parallel time discretization, which
+ // is a more complex exploiting the monotonic properties of time
+ // policies
+ if (WindowUtils.isTimeOnly(getTrigger(), getEviction()) && discretizerKey == null
+ && dataStream.getParallelism() > 1) {
+ return timeReduce(reduceFunction);
+ } else {
+ WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
+ .with(clean(reduceFunction));
- WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
+ WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
- DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+ DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
- if (windowBuffer instanceof CompletePreAggregator) {
- return discretized;
- } else {
- return discretized.reduceWindow(reduceFunction);
+ if (windowBuffer instanceof PreAggregator) {
+ return discretized;
+ } else {
+ return discretized.reduceWindow(reduceFunction);
+ }
}
}
@@ -377,25 +385,76 @@ public class WindowedDataStream<OUT> {
int parallelism = getDiscretizerParallelism(transformation);
return new DiscretizedStream<OUT>(dataStream
- .transform("Stream Discretizer", bufferEventType, discretizer)
+ .transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
.setParallelism(parallelism)
- .transform("WindowBuffer", new StreamWindowTypeInfo<OUT>(getType()),
- bufferInvokable).setParallelism(parallelism), groupByKey, transformation,
- false);
+ .transform(windowBuffer.getClass().getSimpleName(),
+ new StreamWindowTypeInfo<OUT>(getType()), bufferInvokable)
+ .setParallelism(parallelism), groupByKey, transformation, false);
}
+ /**
+ * Returns the degree of parallelism for the stream discretizer. The
+ * returned parallelism is either 1 for for non-parallel global policies (or
+ * when the input stream is non-parallel), environment parallelism for the
+ * policies that can run in parallel (such as, any ditributed policy, reduce
+ * by count or time).
+ *
+ * @param transformation
+ * The applied transformation
+ * @return The parallelism for the stream discretizer
+ */
private int getDiscretizerParallelism(WindowTransformation transformation) {
return isLocal
|| (transformation == WindowTransformation.REDUCEWINDOW && WindowUtils
.isParallelPolicy(getTrigger(), getEviction(), dataStream.getParallelism()))
|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
+
+ }
+
+ /**
+ * Dedicated method for applying parallel time reduce transformations on
+ * windows
+ *
+ * @param reduceFunction
+ * Reduce function to apply
+ * @return The transformed stream
+ */
+ protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
+
+ WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
+ .with(clean(reduceFunction));
+
+ // We get the windowbuffer and set it to emit empty windows with
+ // sequential IDs. This logic is necessarry to merge windows created in
+ // parallel.
+ WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
+
+ // If there is a groupby for the reduce operation we apply it before the
+ // discretizers, because we will forward everything afterwards to
+ // exploit task chaining
+ if (groupByKey != null) {
+ dataStream = dataStream.groupBy(groupByKey);
+ }
+
+ // We discretize the stream and call the timeReduce function of the
+ // discretized stream, we also pass the type of the windowbuffer
+ DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
+
+ return discretized
+ .timeReduce(reduceFunction, windowBuffer instanceof PreAggregator);
+
}
+ /**
+ * Based on the defined policies, returns the stream discretizer to be used
+ */
private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
if (discretizerKey == null) {
return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
} else if (WindowUtils.isSystemTimeTrigger(getTrigger())) {
+ // We return a special more efficient grouped discretizer for system
+ // time policies to avoid lunching multiple threads
return new GroupedTimeDiscretizer<OUT>(discretizerKey,
(TimeTriggerPolicy<OUT>) getTrigger(),
(CloneableEvictionPolicy<OUT>) getEviction());
@@ -416,6 +475,13 @@ public class WindowedDataStream<OUT> {
}
}
+ /**
+ * Based on the given policies returns the WindowBuffer used to store the
+ * elements in the window. This is the module that also encapsulates the
+ * pre-aggregator logic when it is applicable, reducing the space cost, and
+ * trigger latency.
+ *
+ */
@SuppressWarnings("unchecked")
private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
TriggerPolicy<OUT> trigger = getTrigger();
@@ -704,7 +770,7 @@ public class WindowedDataStream<OUT> {
if (evictionHelper != null) {
return evictionHelper.toEvict();
- } else if (userEvicter == null) {
+ } else if (userEvicter == null || userEvicter instanceof TumblingEvictionPolicy) {
if (triggerHelper instanceof Time) {
return triggerHelper.toEvict();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
index 24c0319..470fc81 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
@@ -25,6 +25,7 @@ public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OU
Collector<IN> {
private static final long serialVersionUID = 1L;
+ private boolean copyInput = true;
public ChainableInvokable(Function userFunction) {
super(userFunction);
@@ -36,4 +37,13 @@ public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OU
this.inSerializer = inSerializer;
this.objectSerializer = inSerializer.getObjectSerializer();
}
+
+ public ChainableInvokable<IN, OUT> withoutInputCopy() {
+ copyInput = false;
+ return this;
+ }
+
+ protected IN copyInput(IN input) {
+ return copyInput ? copy(input) : input;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 35060fd..2c6b6e6 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -43,7 +43,7 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void collect(IN record) {
- nextObject = copy(record);
+ nextObject = copyInput(record);
callUserFunctionAndLogException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index ab3f147..610fa53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -50,7 +50,7 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void collect(IN record) {
if (isRunning) {
- nextObject = copy(record);
+ nextObject = copyInput(record);
callUserFunctionAndLogException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 025bd32..436cf4e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -45,7 +45,7 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
@Override
public void collect(IN record) {
if (isRunning) {
- nextObject = copy(record);
+ nextObject = copyInput(record);
callUserFunctionAndLogException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 8fc1f13..9647144 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -45,7 +45,7 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
@Override
public void collect(IN record) {
if (isRunning) {
- nextObject = copy(record);
+ nextObject = copyInput(record);
callUserFunctionAndLogException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index e7fa2b1..fe58105 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -63,7 +63,7 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
@Override
public void collect(IN record) {
if (isRunning) {
- nextObject = copy(record);
+ nextObject = copyInput(record);
callUserFunctionAndLogException();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
new file mode 100644
index 0000000..0f2ee31
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/EmptyWindowFilter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+public class EmptyWindowFilter<OUT> implements FilterFunction<StreamWindow<OUT>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(StreamWindow<OUT> value) throws Exception {
+ return !value.isEmpty();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
new file mode 100644
index 0000000..737485f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelGroupedMerge.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+
+/**
+ * The version of the ParallelMerge CoFlatMap that does not reduce the incoming
+ * elements only appends them to the current window. This is necessary for
+ * grouped reduces.
+ */
+public class ParallelGroupedMerge<OUT> extends ParallelMerge<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ public ParallelGroupedMerge() {
+ super(null);
+ }
+
+ @Override
+ protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
+ throws Exception {
+ current.addAll(nextWindow);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
new file mode 100644
index 0000000..8ffca91
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMerge.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * Class that encapsulates the functionality necessary to merge windows created
+ * in parallel. This CoFlatMap uses the information received on the number of
+ * parts for each window to merge the different parts. It waits until it
+ * receives an indication on the number of parts from all the discretizers
+ * before producing any output.
+ */
+public class ParallelMerge<OUT> extends
+ RichCoFlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>, StreamWindow<OUT>> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected Integer numberOfDiscretizers;
+ private ReduceFunction<OUT> reducer;
+
+ private Map<Integer, Integer> availableNumberOfParts = new HashMap<Integer, Integer>();
+ private Map<Integer, Tuple2<StreamWindow<OUT>, Integer>> receivedWindows = new HashMap<Integer, Tuple2<StreamWindow<OUT>, Integer>>();
+ private Map<Integer, Tuple2<Integer, Integer>> receivedNumberOfParts = new HashMap<Integer, Tuple2<Integer, Integer>>();
+
+ public ParallelMerge(ReduceFunction<OUT> reducer) {
+ this.reducer = reducer;
+ }
+
+ @Override
+ public void flatMap1(StreamWindow<OUT> nextWindow, Collector<StreamWindow<OUT>> out)
+ throws Exception {
+
+ Integer id = nextWindow.windowID;
+
+ Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
+
+ if (current == null) {
+ current = new Tuple2<StreamWindow<OUT>, Integer>(nextWindow, 1);
+ } else {
+ updateCurrent(current.f0, nextWindow);
+ current.f1++;
+ }
+
+ Integer count = current.f1;
+
+ if (availableNumberOfParts.containsKey(id) && availableNumberOfParts.get(id) <= count) {
+ out.collect(current.f0);
+ receivedWindows.remove(id);
+ availableNumberOfParts.remove(id);
+
+ checkOld(id);
+
+ } else {
+ receivedWindows.put(id, (Tuple2<StreamWindow<OUT>, Integer>) current);
+ }
+ }
+
+ private void checkOld(Integer id) {
+ // In case we have remaining partial windows (which indicates errors in
+ // processing), output and log them
+ if (receivedWindows.containsKey(id - 1)) {
+ throw new RuntimeException("Error in processing logic, window with id " + id
+ + " should have already been processed");
+ }
+
+ }
+
+ @Override
+ public void flatMap2(Tuple2<Integer, Integer> partInfo, Collector<StreamWindow<OUT>> out)
+ throws Exception {
+
+ Integer id = partInfo.f0;
+ Integer numOfParts = partInfo.f1;
+
+ Tuple2<Integer, Integer> currentPartInfo = receivedNumberOfParts.get(id);
+ if (currentPartInfo != null) {
+ currentPartInfo.f0 += numOfParts;
+ currentPartInfo.f1++;
+ } else {
+ currentPartInfo = new Tuple2<Integer, Integer>(numOfParts, 1);
+ receivedNumberOfParts.put(id, currentPartInfo);
+ }
+
+ if (currentPartInfo.f1 >= numberOfDiscretizers) {
+ receivedNumberOfParts.remove(id);
+
+ Tuple2<StreamWindow<OUT>, Integer> current = receivedWindows.get(id);
+
+ Integer count = current != null ? current.f1 : -1;
+
+ if (count >= currentPartInfo.f0) {
+ out.collect(current.f0);
+ receivedWindows.remove(id);
+ checkOld(id);
+ } else if (currentPartInfo.f0 > 0) {
+ availableNumberOfParts.put(id, currentPartInfo.f1);
+ }
+ }
+
+ }
+
+ protected void updateCurrent(StreamWindow<OUT> current, StreamWindow<OUT> nextWindow)
+ throws Exception {
+ if (current.size() != 1 || nextWindow.size() != 1) {
+ throw new RuntimeException(
+ "Error in parallel merge logic. Current window should contain only one element.");
+ }
+ OUT currentReduced = current.remove(0);
+ currentReduced = reducer.reduce(currentReduced, nextWindow.get(0));
+ current.add(currentReduced);
+ }
+
+ @Override
+ public void open(Configuration conf) {
+ this.numberOfDiscretizers = getRuntimeContext().getNumberOfParallelSubtasks();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
deleted file mode 100644
index 32778da..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelWindowPartitioner.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * This invokable applies either split or key partitioning depending on the
- * transformation.
- */
-public class ParallelWindowPartitioner<T> extends
- ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
-
- private KeySelector<T, ?> keySelector;
- private int numberOfSplits;
- private int currentWindowID = 0;
-
- public ParallelWindowPartitioner(KeySelector<T, ?> keySelector) {
- super(null);
- this.keySelector = keySelector;
- }
-
- public ParallelWindowPartitioner(int numberOfSplits) {
- super(null);
- this.numberOfSplits = numberOfSplits;
- }
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke() throws Exception {
- while (isRunning && readNext() != null) {
- callUserFunctionAndLogException();
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- StreamWindow<T> currentWindow = nextObject;
- currentWindow.setID(++currentWindowID);
-
- if (keySelector == null) {
- if (numberOfSplits <= 1) {
- collector.collect(currentWindow);
- } else {
- for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
- collector.collect(window);
- }
- }
- } else {
-
- for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
- collector.collect(window);
- }
-
- }
- }
-
- @Override
- public void collect(StreamWindow<T> record) {
- if (isRunning) {
- nextObject = record;
- callUserFunctionAndLogException();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index e668b66..30512e6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -25,13 +25,11 @@ import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
/**
* This invokable represents the discretization step of a window transformation.
* The user supplied eviction and trigger policies are applied to create the
* {@link StreamWindow} that will be further transformed in the next stages.
- * </p> To allow pre-aggregations supply an appropriate {@link WindowBuffer}
*/
public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>> {
@@ -129,11 +127,6 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
emitWindow();
}
- protected synchronized void externalTriggerOnFakeElement(Object input) {
- emitWindow();
- activeEvict(input);
- }
-
/**
* This method emits the content of the buffer as a new {@link StreamWindow}
* if not empty
@@ -175,7 +168,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
ActiveTriggerPolicy<IN> tp = (ActiveTriggerPolicy<IN>) triggerPolicy;
Runnable runnable = tp.createActiveTriggerRunnable(new WindowingCallback());
- if (activePolicyThread != null) {
+ if (runnable != null) {
activePolicyThread = new Thread(runnable);
activePolicyThread.start();
}
@@ -216,7 +209,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
@Override
public String toString() {
- return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString()
- + ")";
+ return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: "
+ + evictionPolicy.toString() + ")";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 75f7d9d..475611f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -23,8 +23,7 @@ import org.apache.flink.streaming.api.windowing.WindowEvent;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
/**
- * This invokable flattens the results of the window transformations by
- * outputing the elements of the {@link StreamWindow} one-by-one
+ * This invokable manages the window buffers attached to the discretizers.
*/
public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
index 3dfd59d..9578a70 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
@@ -50,6 +50,7 @@ public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, Stream
@Override
public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
+
outputWindow.numberOfParts = window.numberOfParts;
mapper.mapWindow(window, outputWindow);
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
new file mode 100644
index 0000000..416b915
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartExtractor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.util.Collector;
+
+/**
+ * This FlatMapFunction is used to send the number of parts for each window ID
+ * (for each parallel discretizer) to the parallel merger that will use is to
+ * merge parallel discretized windows
+ */
+public class WindowPartExtractor<OUT> implements FlatMapFunction<StreamWindow<OUT>, Tuple2<Integer, Integer>> {
+
+ private static final long serialVersionUID = 1L;
+
+ Integer lastIndex = -1;
+
+ @Override
+ public void flatMap(StreamWindow<OUT> value, Collector<Tuple2<Integer, Integer>> out)
+ throws Exception {
+
+ // We dont emit new values for the same index, this avoids sending the
+ // same information for the same partitioned window multiple times
+ if (value.windowID != lastIndex) {
+
+ // For empty windows we send 0 since these windows will be filtered
+ // out
+ if (value.isEmpty()) {
+ out.collect(new Tuple2<Integer, Integer>(value.windowID, 0));
+ } else {
+ out.collect(new Tuple2<Integer, Integer>(value.windowID, value.numberOfParts));
+ }
+ lastIndex = value.windowID;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index 846650d..0a28d99 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -56,13 +56,14 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
if (numberOfSplits <= 1) {
collector.collect(currentWindow);
} else {
- for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
+ for (StreamWindow<T> window : StreamWindow.split(currentWindow, numberOfSplits)) {
collector.collect(window);
}
}
} else {
- for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
+ for (StreamWindow<T> window : StreamWindow
+ .partitionBy(currentWindow, keySelector, true)) {
collector.collect(window);
}
@@ -76,5 +77,4 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
callUserFunctionAndLogException();
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
index b45babb..ee2ea06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -42,7 +42,6 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
private static Random rnd = new Random();
public int windowID;
-
public int numberOfParts;
/**
@@ -104,27 +103,33 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
* Partitions the window using the given keyselector. A subwindow will be
* created for each key.
*
+ * @param streamWindow
+ * StreamWindow instance to partition
* @param keySelector
* The keyselector used for extracting keys.
+ * @param withKey
+ * Flag to decide whether the key object should be included in
+ * the created window
* @return A list of the subwindows
*/
- public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector) throws Exception {
- Map<Object, StreamWindow<T>> partitions = new HashMap<Object, StreamWindow<T>>();
+ public static <X> List<StreamWindow<X>> partitionBy(StreamWindow<X> streamWindow,
+ KeySelector<X, ?> keySelector, boolean withKey) throws Exception {
+ Map<Object, StreamWindow<X>> partitions = new HashMap<Object, StreamWindow<X>>();
- for (T value : this) {
+ for (X value : streamWindow) {
Object key = keySelector.getKey(value);
- StreamWindow<T> window = partitions.get(key);
+ StreamWindow<X> window = partitions.get(key);
if (window == null) {
- window = new StreamWindow<T>(this.windowID, 0);
+ window = new StreamWindow<X>(streamWindow.windowID, 0);
partitions.put(key, window);
}
window.add(value);
}
- List<StreamWindow<T>> output = new ArrayList<StreamWindow<T>>();
+ List<StreamWindow<X>> output = new ArrayList<StreamWindow<X>>();
int numkeys = partitions.size();
- for (StreamWindow<T> window : partitions.values()) {
+ for (StreamWindow<X> window : partitions.values()) {
output.add(window.setNumberOfParts(numkeys));
}
@@ -134,30 +139,32 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
/**
* Splits the window into n equal (if possible) sizes.
*
+ * @param window
+ * Window to split
* @param n
* Number of desired partitions
* @return The list of subwindows.
*/
- public List<StreamWindow<T>> split(int n) {
- int numElements = size();
+ public static <X> List<StreamWindow<X>> split(StreamWindow<X> window, int n) {
+ int numElements = window.size();
if (n == 0) {
- return new ArrayList<StreamWindow<T>>();
+ return new ArrayList<StreamWindow<X>>();
}
if (n > numElements) {
- return split(numElements);
+ return split(window, numElements);
} else {
- List<StreamWindow<T>> split = new ArrayList<StreamWindow<T>>();
+ List<StreamWindow<X>> split = new ArrayList<StreamWindow<X>>();
int splitSize = numElements / n;
int index = -1;
- StreamWindow<T> currentSubWindow = new StreamWindow<T>(windowID, n);
+ StreamWindow<X> currentSubWindow = new StreamWindow<X>(window.windowID, n);
split.add(currentSubWindow);
- for (T element : this) {
+ for (X element : window) {
index++;
if (index == splitSize && split.size() < n) {
- currentSubWindow = new StreamWindow<T>(windowID, n);
+ currentSubWindow = new StreamWindow<X>(window.windowID, n);
split.add(currentSubWindow);
index = 0;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
index e8945f1..229cb4a 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -33,6 +34,7 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
private final TypeSerializer<T> typeSerializer;
TypeSerializer<Integer> intSerializer = IntSerializer.INSTANCE;
+ TypeSerializer<Boolean> boolSerializer = BooleanSerializer.INSTANCE;
public StreamWindowSerializer(TypeInformation<T> typeInfo, ExecutionConfig conf) {
this.typeSerializer = typeInfo.createSerializer(conf);
@@ -62,7 +64,6 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
reuse.clear();
reuse.windowID = from.windowID;
reuse.numberOfParts = from.numberOfParts;
-
for (T element : from) {
reuse.add(typeSerializer.copy(element));
}
@@ -75,31 +76,21 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
}
@Override
- public void serialize(StreamWindow<T> value, DataOutputView target) throws IOException {
+ public void serialize(StreamWindow<T> window, DataOutputView target) throws IOException {
+
+ intSerializer.serialize(window.windowID, target);
+ intSerializer.serialize(window.numberOfParts, target);
- intSerializer.serialize(value.windowID, target);
- intSerializer.serialize(value.numberOfParts, target);
- intSerializer.serialize(value.size(), target);
+ intSerializer.serialize(window.size(), target);
- for (T element : value) {
+ for (T element : window) {
typeSerializer.serialize(element, target);
}
}
@Override
public StreamWindow<T> deserialize(DataInputView source) throws IOException {
- StreamWindow<T> window = createInstance();
-
- window.windowID = intSerializer.deserialize(source);
- window.numberOfParts = intSerializer.deserialize(source);
-
- int size = intSerializer.deserialize(source);
-
- for (int i = 0; i < size; i++) {
- window.add(typeSerializer.deserialize(source));
- }
-
- return window;
+ return deserialize(createInstance(), source);
}
@Override
@@ -109,10 +100,10 @@ public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow
StreamWindow<T> window = reuse;
window.clear();
- window.windowID = source.readInt();
- window.numberOfParts = source.readInt();
+ window.windowID = intSerializer.deserialize(source);
+ window.numberOfParts = intSerializer.deserialize(source);
- int size = source.readInt();
+ int size = intSerializer.deserialize(source);
for (int i = 0; i < size; i++) {
window.add(typeSerializer.deserialize(source));
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
index 8411d31..034c7d7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -45,9 +45,10 @@ public class WindowUtils {
}
public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
- int inputParallelism) {
- return inputParallelism != 1
- && ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy)) || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy));
+ int parallelism) {
+ return ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
+ || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy) || (WindowUtils
+ .isTimeOnly(trigger, eviction) && parallelism > 1));
}
public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 8e39398..371e20d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Collector;
* Basic window buffer that stores the elements in a simple list without any
* pre-aggregation.
*/
-public class BasicWindowBuffer<T> implements WindowBuffer<T> {
+public class BasicWindowBuffer<T> extends WindowBuffer<T> {
private static final long serialVersionUID = 1L;
protected LinkedList<T> buffer;
@@ -36,15 +36,12 @@ public class BasicWindowBuffer<T> implements WindowBuffer<T> {
this.buffer = new LinkedList<T>();
}
- public boolean emitWindow(Collector<StreamWindow<T>> collector) {
- if (!buffer.isEmpty()) {
- StreamWindow<T> currentWindow = new StreamWindow<T>();
+ public void emitWindow(Collector<StreamWindow<T>> collector) {
+ if (emitEmpty || !buffer.isEmpty()) {
+ StreamWindow<T> currentWindow = createEmptyWindow();
currentWindow.addAll(buffer);
collector.collect(currentWindow);
- return true;
- } else {
- return false;
- }
+ }
}
public void store(T element) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
deleted file mode 100644
index 59bd974..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-/**
- * Interface for marking window pre-aggregators that fully process the window so
- * that no further reduce step is necessary afterwards.
- */
-public interface CompletePreAggregator {
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
new file mode 100644
index 0000000..1b95248
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.windowbuffer;
+
+/**
+ * Interface for marking window pre-aggregators that fully process the window so
+ * that no further reduce step is necessary afterwards.
+ */
+public interface PreAggregator {
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 27f7ff5..8b9558f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;
-public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+public abstract class SlidingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
private static final long serialVersionUID = 1L;
@@ -45,18 +45,14 @@ public abstract class SlidingPreReducer<T> implements WindowBuffer<T>, CompleteP
this.serializer = serializer;
}
- public boolean emitWindow(Collector<StreamWindow<T>> collector) {
- StreamWindow<T> currentWindow = new StreamWindow<T>();
+ public void emitWindow(Collector<StreamWindow<T>> collector) {
+ StreamWindow<T> currentWindow = createEmptyWindow();
try {
- if (addFinalAggregate(currentWindow)) {
+ if (addFinalAggregate(currentWindow) || emitEmpty) {
collector.collect(currentWindow);
- afterEmit();
- return true;
- } else {
- afterEmit();
- return false;
- }
+ }
+ afterEmit();
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index 9431a99..68f9837 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -29,7 +29,7 @@ import org.apache.flink.util.Collector;
/**
* Grouped pre-reducer for tumbling eviction polciy.
*/
-public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
private static final long serialVersionUID = 1L;
@@ -37,6 +37,7 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
private KeySelector<T, ?> keySelector;
private Map<Object, T> reducedValues;
+ private Map<Object, T> keyInstancePerKey = new HashMap<Object, T>();
private TypeSerializer<T> serializer;
@@ -48,16 +49,15 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
this.reducedValues = new HashMap<Object, T>();
}
- public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+ public void emitWindow(Collector<StreamWindow<T>> collector) {
if (!reducedValues.isEmpty()) {
- StreamWindow<T> currentWindow = new StreamWindow<T>();
+ StreamWindow<T> currentWindow = createEmptyWindow();
currentWindow.addAll(reducedValues.values());
collector.collect(currentWindow);
reducedValues.clear();
- return true;
- } else {
- return false;
+ } else if (emitEmpty) {
+ collector.collect(createEmptyWindow());
}
}
@@ -74,6 +74,10 @@ public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePr
}
reducedValues.put(key, reduced);
+
+ if (emitPerGroup && !keyInstancePerKey.containsKey(key)) {
+ keyInstancePerKey.put(key, element);
+ }
}
public void evict(int n) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index 58b30a6..e56e556 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.Collector;
/**
* Non-grouped pre-reducer for tumbling eviction policy.
*/
-public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
+public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
private static final long serialVersionUID = 1L;
@@ -39,15 +39,14 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
this.serializer = serializer;
}
- public boolean emitWindow(Collector<StreamWindow<T>> collector) {
+ public void emitWindow(Collector<StreamWindow<T>> collector) {
if (reduced != null) {
- StreamWindow<T> currentWindow = new StreamWindow<T>();
+ StreamWindow<T> currentWindow = createEmptyWindow();
currentWindow.add(reduced);
collector.collect(currentWindow);
reduced = null;
- return true;
- } else {
- return false;
+ } else if (emitEmpty) {
+ collector.collect(createEmptyWindow());
}
}
@@ -72,4 +71,10 @@ public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggreg
return reduced.toString();
}
+ @Override
+ public WindowBuffer<T> emitEmpty() {
+ emitEmpty = true;
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index 2dd50db..5c5ea52 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -23,17 +23,38 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;
/**
- * Interface for defining specialized buffers to store/emit window data.
+ * Class for defining specialized buffers to store/emit window data.
* Pre-aggregators should be implemented using this interface.
*/
-public interface WindowBuffer<T> extends Serializable, Cloneable {
+public abstract class WindowBuffer<T> implements Serializable, Cloneable {
- public void store(T element) throws Exception;
+ private static final long serialVersionUID = 1L;
- public void evict(int n);
+ protected Integer nextID = 1;
+ protected boolean sequentialID = false;
+ protected boolean emitEmpty = false;
+ protected boolean emitPerGroup = false;
- public boolean emitWindow(Collector<StreamWindow<T>> collector);
+ public abstract void store(T element) throws Exception;
- public WindowBuffer<T> clone();
+ public abstract void evict(int n);
+
+ public abstract void emitWindow(Collector<StreamWindow<T>> collector);
+
+ public abstract WindowBuffer<T> clone();
+
+ public WindowBuffer<T> emitEmpty() {
+ emitEmpty = true;
+ return this;
+ }
+
+ public WindowBuffer<T> sequentialID() {
+ sequentialID = true;
+ return this;
+ }
+
+ protected StreamWindow<T> createEmptyWindow() {
+ return sequentialID ? new StreamWindow<T>(nextID++) : new StreamWindow<T>();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1377ca97/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
new file mode 100644
index 0000000..d892c48
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/ParallelMergeTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.junit.Test;
+
+public class ParallelMergeTest {
+
+ @Test
+ public void nonGroupedTest() throws Exception {
+
+ ReduceFunction<Integer> reducer = new ReduceFunction<Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer reduce(Integer a, Integer b) throws Exception {
+ return a + b;
+ }
+ };
+
+ TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
+ List<StreamWindow<Integer>> output = out.getCollected();
+
+ ParallelMerge<Integer> merger = new ParallelMerge<Integer>(reducer);
+ merger.numberOfDiscretizers = 2;
+
+ merger.flatMap1(createTestWindow(1), out);
+ merger.flatMap1(createTestWindow(1), out);
+ merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+ assertTrue(output.isEmpty());
+ merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+ assertEquals(StreamWindow.fromElements(2), output.get(0));
+
+ merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), out);
+ merger.flatMap1(createTestWindow(2), out);
+ merger.flatMap1(createTestWindow(2), out);
+ merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), out);
+ assertEquals(1, output.size());
+ merger.flatMap1(createTestWindow(2), out);
+ assertEquals(StreamWindow.fromElements(3), output.get(1));
+
+ // check error handling
+ merger.flatMap1(createTestWindow(3), out);
+ merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
+ merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
+
+ merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
+ merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
+ merger.flatMap1(createTestWindow(4), out);
+ try {
+ merger.flatMap1(createTestWindow(4), out);
+ fail();
+ } catch (RuntimeException e) {
+ // Do nothing
+ }
+
+ ParallelMerge<Integer> merger2 = new ParallelMerge<Integer>(reducer);
+ merger2.numberOfDiscretizers = 2;
+ merger2.flatMap1(createTestWindow(0), out);
+ merger2.flatMap1(createTestWindow(1), out);
+ merger2.flatMap1(createTestWindow(1), out);
+ merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+ try {
+ merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+ fail();
+ } catch (RuntimeException e) {
+ // Do nothing
+ }
+
+ }
+
+ @Test
+ public void groupedTest() throws Exception {
+
+ TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
+ List<StreamWindow<Integer>> output = out.getCollected();
+
+ ParallelMerge<Integer> merger = new ParallelGroupedMerge<Integer>();
+ merger.numberOfDiscretizers = 2;
+
+ merger.flatMap1(createTestWindow(1), out);
+ merger.flatMap1(createTestWindow(1), out);
+ merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+ assertTrue(output.isEmpty());
+ merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+ assertEquals(StreamWindow.fromElements(1, 1), output.get(0));
+ }
+
+ private StreamWindow<Integer> createTestWindow(Integer id) {
+ StreamWindow<Integer> ret = new StreamWindow<Integer>(id);
+ ret.add(1);
+ return ret;
+ }
+}