You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/21 08:00:29 UTC

flink git commit: [streaming] Parallel time reduce logic refactor to force prereducers

Repository: flink
Updated Branches:
  refs/heads/master e2a00183e -> a0900e732


[streaming] Parallel time reduce logic refactor to force prereducers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0900e73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0900e73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0900e73

Branch: refs/heads/master
Commit: a0900e7324b7a2f85080d151ab5ed1f015c257df
Parents: e2a0018
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Apr 20 22:05:32 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Apr 20 22:05:32 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       | 23 ++------
 .../api/datastream/WindowedDataStream.java      | 56 +++++++++++---------
 2 files changed, 36 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0900e73/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 2a58f60..c6ee36d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -106,31 +106,20 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 	 * 
 	 * @param reduceFunction
 	 *            The reduce function to be applied on the windows
-	 * @param isPreAggregated
-	 *            Flag whether the window buffer was a pre-aggregator or not
 	 * @return
 	 */
-	protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction,
-			boolean isPreAggregated) {
-
-		// We partition the windowed stream if it is not already pre-aggregated
-		DiscretizedStream<OUT> partitioned = isPreAggregated ? this : partition(transformation);
+	protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
 
 		// Since we also emit the empty windows for bookkeeping, we need to
 		// filter them out
-		DiscretizedStream<OUT> nonEmpty = filterEmpty(partitioned);
+		DiscretizedStream<OUT> nonEmpty = filterEmpty(this);
 
 		// We extract the number of parts from each window we will merge using
 		// this afterwards
-		DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(partitioned);
-
-		// We reduce the windows if not pre-aggregated
-		DiscretizedStream<OUT> reduced = isPreAggregated ? nonEmpty : nonEmpty.transform(
-				WindowTransformation.REDUCEWINDOW, "Window Reduce", nonEmpty.getType(),
-				new WindowReducer<OUT>(reduceFunction));
+		DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(this);
 
 		// We merge the windows by the number of parts
-		return wrap(parallelMerge(numOfParts, reduced, reduceFunction), false);
+		return wrap(parallelMerge(numOfParts, nonEmpty, reduceFunction), false);
 
 	}
 
@@ -215,8 +204,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
 			return out;
 		} else if (transformation == WindowTransformation.REDUCEWINDOW
-				&& parallelism != discretizedStream.getExecutionEnvironment()
-						.getParallelism()) {
+				&& parallelism != discretizedStream.getExecutionEnvironment().getParallelism()) {
 			DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
 					new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
 
@@ -224,7 +212,6 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 
 			return out;
 		} else {
-			this.isPartitioned = false;
 			return this;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0900e73/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 15e0179..c2ebdcf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -393,16 +393,18 @@ public class WindowedDataStream<OUT> {
 				.setParallelism(parallelism)
 				.transform(windowBuffer.getClass().getSimpleName(),
 						new StreamWindowTypeInfo<OUT>(getType()), bufferOperator)
-				.setParallelism(parallelism), groupByKey, transformation, false);
+				.setParallelism(parallelism), groupByKey, transformation,
+				WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
+						dataStream.getParallelism()));
 
 	}
 
 	/**
-	 * Returns the parallelism for the stream discretizer. The
-	 * returned parallelism is either 1 for for non-parallel global policies (or
-	 * when the input stream is non-parallel), environment parallelism for the
-	 * policies that can run in parallel (such as, any ditributed policy, reduce
-	 * by count or time).
+	 * Returns the parallelism for the stream discretizer. The returned
+	 * parallelism is either 1 for for non-parallel global policies (or when the
+	 * input stream is non-parallel), environment parallelism for the policies
+	 * that can run in parallel (such as, any ditributed policy, reduce by count
+	 * or time).
 	 * 
 	 * @param transformation
 	 *            The applied transformation
@@ -445,7 +447,12 @@ public class WindowedDataStream<OUT> {
 		// discretized stream, we also pass the type of the windowbuffer
 		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
 
-		return discretized.timeReduce(reduceFunction, windowBuffer instanceof PreAggregator);
+		if (!(windowBuffer instanceof PreAggregator)) {
+			throw new RuntimeException(
+					"Error in preaggregator logic, parallel time reduce should always be preaggregated");
+		}
+
+		return discretized.timeReduce(reduceFunction);
 
 	}
 
@@ -529,31 +536,30 @@ public class WindowedDataStream<OUT> {
 							WindowUtils.getTimeStampWrapper(trigger));
 				}
 
-			} else if(WindowUtils.isJumpingCountPolicy(trigger, eviction)){
-				if(groupByKey == null){
-					return new JumpingCountPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(), getType()
-							.createSerializer(getExecutionConfig()),
+			} else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
+				if (groupByKey == null) {
+					return new JumpingCountPreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(), getType()
+									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
 				} else {
 					return new JumpingCountGroupedPreReducer<OUT>(
-							(ReduceFunction<OUT>) transformation.getUDF(),
-							groupByKey,
-							getType().createSerializer(getExecutionConfig()),
+							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+									.createSerializer(getExecutionConfig()),
 							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
 				}
-			} else if(WindowUtils.isJumpingTimePolicy(trigger, eviction)){
-				if(groupByKey == null) {
-					return new JumpingTimePreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
-							getType().createSerializer(getExecutionConfig()),
-							WindowUtils.getSlideSize(trigger),
-							WindowUtils.getWindowSize(eviction),
+			} else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
+				if (groupByKey == null) {
+					return new JumpingTimePreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(), getType()
+									.createSerializer(getExecutionConfig()),
+							WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
 							WindowUtils.getTimeStampWrapper(trigger));
 				} else {
-					return new JumpingTimeGroupedPreReducer<OUT>((ReduceFunction<OUT>) transformation.getUDF(),
-							groupByKey,
-							getType().createSerializer(getExecutionConfig()),
-							WindowUtils.getSlideSize(trigger),
-							WindowUtils.getWindowSize(eviction),
+					return new JumpingTimeGroupedPreReducer<OUT>(
+							(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
+									.createSerializer(getExecutionConfig()),
+							WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
 							WindowUtils.getTimeStampWrapper(trigger));
 				}
 			}