You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:14 UTC

[4/5] flink git commit: [FLINK-1657] [streaming] Count window parallel discretization

[FLINK-1657] [streaming] Count window parallel discretization


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2be00ac7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2be00ac7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2be00ac7

Branch: refs/heads/master
Commit: 2be00ac7aca75c2fb9b66a554873b59970bfa21c
Parents: f09c0af
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Mar 7 20:07:46 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/DiscretizedStream.java       |  2 +
 .../api/datastream/WindowedDataStream.java      | 70 ++++++--------------
 .../windowing/WindowBufferInvokable.java        |  3 +
 .../streaming/api/windowing/WindowUtils.java    | 70 ++++++++++++++++++++
 .../windowing/WindowIntegrationTest.java        | 54 +++++++--------
 5 files changed, 124 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index dc2e987..451acf0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -35,6 +35,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartiti
 import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
 
 /**
  * A {@link DiscretizedStream} represents a data stream that has been divided

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 12ef0e6..742ff22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -44,6 +43,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBufferI
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
 import org.apache.flink.streaming.api.windowing.WindowEvent;
+import org.apache.flink.streaming.api.windowing.WindowUtils;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -71,16 +72,6 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
  */
 public class WindowedDataStream<OUT> {
 
-	protected enum WindowTransformation {
-		REDUCEWINDOW, MAPWINDOW, NONE;
-		private Function UDF;
-
-		public WindowTransformation with(Function UDF) {
-			this.UDF = UDF;
-			return this;
-		}
-	}
-
 	protected DataStream<OUT> dataStream;
 
 	protected boolean isLocal = false;
@@ -266,8 +257,7 @@ public class WindowedDataStream<OUT> {
 		WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
 				.with(reduceFunction);
 
-		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation, getTrigger(),
-				getEviction(), discretizerKey);
+		WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
 
 		DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
 
@@ -324,11 +314,9 @@ public class WindowedDataStream<OUT> {
 	private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
 			WindowBuffer<OUT> windowBuffer) {
 
-		StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer(getTrigger(),
-				getEviction(), discretizerKey);
+		StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
 
-		StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(
-				windowBuffer, discretizerKey);
+		StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(windowBuffer);
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
@@ -346,30 +334,27 @@ public class WindowedDataStream<OUT> {
 	}
 
 	private int getDiscretizerParallelism() {
-		return isLocal || (discretizerKey != null) ? dataStream.environment
-				.getDegreeOfParallelism() : 1;
+		return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
+				|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
 	}
 
-	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer(TriggerPolicy<OUT> trigger,
-			EvictionPolicy<OUT> eviction, KeySelector<OUT, ?> discretizerKey) {
-
+	private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
 		if (discretizerKey == null) {
-			return new StreamDiscretizer<OUT>(trigger, eviction);
-		} else if (trigger instanceof TimeTriggerPolicy
-				&& ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) {
+			return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
+		} else if (WindowUtils.isSystemTimeTrigger(getTrigger())) {
 			return new GroupedTimeDiscretizer<OUT>(discretizerKey,
-					(TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
-		}
-
-		else {
+					(TimeTriggerPolicy<OUT>) getTrigger(),
+					(CloneableEvictionPolicy<OUT>) getEviction());
+		} else {
 			return new GroupedStreamDiscretizer<OUT>(discretizerKey,
-					(CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
+					(CloneableTriggerPolicy<OUT>) getTrigger(),
+					(CloneableEvictionPolicy<OUT>) getEviction());
 		}
 
 	}
 
 	private StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> getBufferInvokable(
-			WindowBuffer<OUT> windowBuffer, KeySelector<OUT, ?> discretizerKey) {
+			WindowBuffer<OUT> windowBuffer) {
 		if (discretizerKey == null) {
 			return new WindowBufferInvokable<OUT>(windowBuffer);
 		} else {
@@ -378,20 +363,18 @@ public class WindowedDataStream<OUT> {
 	}
 
 	@SuppressWarnings("unchecked")
-	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation,
-			TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction,
-			KeySelector<OUT, ?> discretizerKey) {
+	private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
 
 		if (transformation == WindowTransformation.REDUCEWINDOW
-				&& eviction instanceof TumblingEvictionPolicy) {
+				&& getEviction() instanceof TumblingEvictionPolicy) {
 			if (groupByKey == null) {
 				return new TumblingPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.UDF), getType()
+						clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
 								.createSerializer(getExecutionConfig()));
 			} else {
 				return new TumblingGroupedPreReducer<OUT>(
-						clean((ReduceFunction<OUT>) transformation.UDF), groupByKey,
-						getType().createSerializer(getExecutionConfig()));
+						clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
+								.createSerializer(getExecutionConfig()));
 			}
 		}
 		return new BasicWindowBuffer<OUT>();
@@ -678,15 +661,4 @@ public class WindowedDataStream<OUT> {
 	protected WindowedDataStream<OUT> copy() {
 		return new WindowedDataStream<OUT>(this);
 	}
-
-	protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer getKey(StreamWindow<R> value) throws Exception {
-			return value.windowID;
-		}
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 75f7d9d..72be823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -52,10 +52,13 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
 	protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
 			throws Exception {
 		if (windowEvent.isElement()) {
+			System.out.println("element: " + windowEvent.getElement());
 			buffer.store(windowEvent.getElement());
 		} else if (windowEvent.isEviction()) {
+			System.out.println("eviction: " + windowEvent.getEviction());
 			buffer.evict(windowEvent.getEviction());
 		} else if (windowEvent.isTrigger()) {
+			System.out.println("trigger");
 			buffer.emitWindow(collector);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
new file mode 100644
index 0000000..1e3ba86
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+
+public class WindowUtils {
+
+	public enum WindowTransformation {
+		REDUCEWINDOW, MAPWINDOW, NONE;
+		private Function UDF;
+
+		public WindowTransformation with(Function UDF) {
+			this.UDF = UDF;
+			return this;
+		}
+
+		public Function getUDF() {
+			return UDF;
+		}
+	}
+
+	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
+				|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+	}
+
+	public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+		return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
+	}
+
+	public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
+		return trigger instanceof TimeTriggerPolicy
+				&& ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
+	}
+
+	public static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer getKey(StreamWindow<R> value) throws Exception {
+			return value.windowID;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 2ed0002..25b5f87 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,17 +89,6 @@ public class WindowIntegrationTest implements Serializable {
 		inputs.add(11);
 		inputs.add(11);
 
-		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-
-		DataStream<Integer> source = env.fromCollection(inputs);
-
-		source.window(Count.of(2)).every(Count.of(3)).sum(0).getDiscretizedStream()
-				.addSink(new CentralSink1());
-
-		source.window(Count.of(4)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
-				.flatten().addSink(new CentralSink2());
-
-		KeySelector<Integer, ?> key = new ModKey(2);
 		Timestamp<Integer> ts = new Timestamp<Integer>() {
 
 			private static final long serialVersionUID = 1L;
@@ -110,38 +99,50 @@ public class WindowIntegrationTest implements Serializable {
 			}
 		};
 
+		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+
+		DataStream<Integer> source = env.fromCollection(inputs);
+
+		source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+				.addSink(new CentralSink1());
+
+		source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
+				.flatten().addSink(new CentralSink2());
+
+		KeySelector<Integer, ?> key = new ModKey(2);
+
 		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new DistributedSink1());
 
 		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
 				.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
 
-		source.window(Count.of(2)).every(Count.of(3)).min(0).getDiscretizedStream()
+		source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
 				.addSink(new CentralSink3());
 
 		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
 				.addSink(new DistributedSink3());
 
-		source.window(Count.of(5)).mapWindow(new IdentityWindowMap()).flatten()
+		source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
 				.addSink(new DistributedSink4());
 
 		env.execute();
 
-		// sum ( Count of 2 slide 3 )
+		// sum ( Time of 2 slide 3 )
 		List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
-		expected1.add(StreamWindow.fromElements(4));
+		expected1.add(StreamWindow.fromElements(5));
 		expected1.add(StreamWindow.fromElements(9));
-		expected1.add(StreamWindow.fromElements(22));
+		expected1.add(StreamWindow.fromElements(32));
 
 		validateOutput(expected1, CentralSink1.windows);
 
-		// Tumbling Count of 4 grouped by mod 2
+		// Tumbling Time of 4 grouped by mod 2
 		List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
-		expected2.add(StreamWindow.fromElements(2, 2));
+		expected2.add(StreamWindow.fromElements(2, 2, 4));
 		expected2.add(StreamWindow.fromElements(1, 3));
-		expected2.add(StreamWindow.fromElements(4, 10));
-		expected2.add(StreamWindow.fromElements(5, 11));
-		expected2.add(StreamWindow.fromElements(11));
+		expected2.add(StreamWindow.fromElements(5));
+		expected2.add(StreamWindow.fromElements(10));
+		expected2.add(StreamWindow.fromElements(11, 11));
 
 		validateOutput(expected2, CentralSink2.windows);
 
@@ -167,11 +168,11 @@ public class WindowIntegrationTest implements Serializable {
 
 		validateOutput(expected4, DistributedSink2.windows);
 
-		// min ( Count of 2 slide 3 )
+		// min ( Time of 2 slide 3 )
 		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
-		expected5.add(StreamWindow.fromElements(2));
+		expected5.add(StreamWindow.fromElements(1));
 		expected5.add(StreamWindow.fromElements(4));
-		expected5.add(StreamWindow.fromElements(11));
+		expected5.add(StreamWindow.fromElements(10));
 
 		validateOutput(expected5, CentralSink3.windows);
 
@@ -186,8 +187,9 @@ public class WindowIntegrationTest implements Serializable {
 		validateOutput(expected6, DistributedSink3.windows);
 
 		List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
-		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
-		expected7.add(StreamWindow.fromElements(5, 10, 11, 11));
+		expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+		expected7.add(StreamWindow.fromElements(10));
+		expected7.add(StreamWindow.fromElements(10, 11, 11));
 
 		validateOutput(expected7, DistributedSink4.windows);