You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/21 08:00:29 UTC
flink git commit: [streaming] Parallel time reduce logic refactor to
force prereducers
Repository: flink
Updated Branches:
refs/heads/master e2a00183e -> a0900e732
[streaming] Parallel time reduce logic refactor to force prereducers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0900e73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0900e73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0900e73
Branch: refs/heads/master
Commit: a0900e7324b7a2f85080d151ab5ed1f015c257df
Parents: e2a0018
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Apr 20 22:05:32 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Apr 20 22:05:32 2015 +0200
----------------------------------------------------------------------
.../api/datastream/DiscretizedStream.java | 23 ++------
.../api/datastream/WindowedDataStream.java | 56 +++++++++++---------
2 files changed, 36 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a0900e73/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 2a58f60..c6ee36d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -106,31 +106,20 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
*
* @param reduceFunction
* The reduce function to be applied on the windows
- * @param isPreAggregated
- * Flag whether the window buffer was a pre-aggregator or not
* @return
*/
- protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction,
- boolean isPreAggregated) {
-
- // We partition the windowed stream if it is not already pre-aggregated
- DiscretizedStream<OUT> partitioned = isPreAggregated ? this : partition(transformation);
+ protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
// Since we also emit the empty windows for bookkeeping, we need to
// filter them out
- DiscretizedStream<OUT> nonEmpty = filterEmpty(partitioned);
+ DiscretizedStream<OUT> nonEmpty = filterEmpty(this);
// We extract the number of parts from each window we will merge using
// this afterwards
- DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(partitioned);
-
- // We reduce the windows if not pre-aggregated
- DiscretizedStream<OUT> reduced = isPreAggregated ? nonEmpty : nonEmpty.transform(
- WindowTransformation.REDUCEWINDOW, "Window Reduce", nonEmpty.getType(),
- new WindowReducer<OUT>(reduceFunction));
+ DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(this);
// We merge the windows by the number of parts
- return wrap(parallelMerge(numOfParts, reduced, reduceFunction), false);
+ return wrap(parallelMerge(numOfParts, nonEmpty, reduceFunction), false);
}
@@ -215,8 +204,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
return out;
} else if (transformation == WindowTransformation.REDUCEWINDOW
- && parallelism != discretizedStream.getExecutionEnvironment()
- .getParallelism()) {
+ && parallelism != discretizedStream.getExecutionEnvironment().getParallelism()) {
DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
@@ -224,7 +212,6 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
return out;
} else {
- this.isPartitioned = false;
return this;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a0900e73/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 15e0179..c2ebdcf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -393,16 +393,18 @@ public class WindowedDataStream<OUT> {
.setParallelism(parallelism)
.transform(windowBuffer.getClass().getSimpleName(),
new StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
- .setParallelism(parallelism), groupByKey, transformation, false);
+ .setParallelism(parallelism), groupByKey, transformation,
+ WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
+ dataStream.getParallelism()));
}
/**
- * Returns the parallelism for the stream discretizer. The
- * returned parallelism is either 1 for for non-parallel global policies (or
- * when the input stream is non-parallel), environment parallelism for the
- * policies that can run in parallel (such as, any ditributed policy, reduce
- * by count or time).
+ * Returns the parallelism for the stream discretizer. The returned
+ * parallelism is either 1 for for non-parallel global policies (or when the
+ * input stream is non-parallel), environment parallelism for the policies
+ * that can run in parallel (such as, any ditributed policy, reduce by count
+ * or time).
*
* @param transformation
* The applied transformation
@@ -445,7 +447,12 @@ public class WindowedDataStream<OUT> {
// discretized stream, we also pass the type of the windowbuffer
DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
- return discretized.timeReduce(reduceFunction, windowBuffer instanceof PreAggregator);
+ if (!(windowBuffer instanceof PreAggregator)) {
+ throw new RuntimeException(
+ "Error in preaggregator logic, parallel time reduce should always be preaggregated");
+ }
+
+ return discretized.timeReduce(reduceFunction);
}
@@ -529,31 +536,30 @@ public class WindowedDataStream<OUT> {
WindowUtils.getTimeStampWrapper(trigger));
}
- } else if(WindowUtils.isJumpingCountPolicy(trigger, eviction)){
- if(groupByKey == null){
- return new JumpingCountPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(), getType()
- .createSerializer(getExecutionConfig()),
+ } else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
+ if (groupByKey == null) {
+ return new JumpingCountPreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), getType()
+ .createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
} else {
return new JumpingCountGroupedPreReducer<OUT>(
- (ReduceFunction<OUT>) transformation.getUDF(),
- groupByKey,
- getType().createSerializer(getExecutionConfig()),
+ (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+ .createSerializer(getExecutionConfig()),
WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
}
- } else if(WindowUtils.isJumpingTimePolicy(trigger, eviction)){
- if(groupByKey == null) {
- return new JumpingTimePreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
- getType().createSerializer(getExecutionConfig()),
- WindowUtils.getSlideSize(trigger),
- WindowUtils.getWindowSize(eviction),
+ } else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
+ if (groupByKey == null) {
+ return new JumpingTimePreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), getType()
+ .createSerializer(getExecutionConfig()),
+ WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
WindowUtils.getTimeStampWrapper(trigger));
} else {
- return new JumpingTimeGroupedPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
- groupByKey,
- getType().createSerializer(getExecutionConfig()),
- WindowUtils.getSlideSize(trigger),
- WindowUtils.getWindowSize(eviction),
+ return new JumpingTimeGroupedPreReducer<OUT>(
+ (ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+ .createSerializer(getExecutionConfig()),
+ WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
WindowUtils.getTimeStampWrapper(trigger));
}
}