You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/11 01:23:14 UTC
[4/5] flink git commit: [FLINK-1657] [streaming] Count window
parallel discretization
[FLINK-1657] [streaming] Count window parallel discretization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2be00ac7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2be00ac7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2be00ac7
Branch: refs/heads/master
Commit: 2be00ac7aca75c2fb9b66a554873b59970bfa21c
Parents: f09c0af
Author: Gyula Fora <gy...@apache.org>
Authored: Sat Mar 7 20:07:46 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Tue Mar 10 16:33:43 2015 +0100
----------------------------------------------------------------------
.../api/datastream/DiscretizedStream.java | 2 +
.../api/datastream/WindowedDataStream.java | 70 ++++++--------------
.../windowing/WindowBufferInvokable.java | 3 +
.../streaming/api/windowing/WindowUtils.java | 70 ++++++++++++++++++++
.../windowing/WindowIntegrationTest.java | 54 +++++++--------
5 files changed, 124 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index dc2e987..451acf0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -35,6 +35,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartiti
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
/**
* A {@link DiscretizedStream} represents a data stream that has been divided
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 12ef0e6..742ff22 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -44,6 +43,8 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.WindowBufferI
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.windowing.WindowEvent;
+import org.apache.flink.streaming.api.windowing.WindowUtils;
+import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -71,16 +72,6 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
*/
public class WindowedDataStream<OUT> {
- protected enum WindowTransformation {
- REDUCEWINDOW, MAPWINDOW, NONE;
- private Function UDF;
-
- public WindowTransformation with(Function UDF) {
- this.UDF = UDF;
- return this;
- }
- }
-
protected DataStream<OUT> dataStream;
protected boolean isLocal = false;
@@ -266,8 +257,7 @@ public class WindowedDataStream<OUT> {
WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
.with(reduceFunction);
- WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation, getTrigger(),
- getEviction(), discretizerKey);
+ WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation);
DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
@@ -324,11 +314,9 @@ public class WindowedDataStream<OUT> {
private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
WindowBuffer<OUT> windowBuffer) {
- StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer(getTrigger(),
- getEviction(), discretizerKey);
+ StreamInvokable<OUT, WindowEvent<OUT>> discretizer = getDiscretizer();
- StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(
- windowBuffer, discretizerKey);
+ StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> bufferInvokable = getBufferInvokable(windowBuffer);
@SuppressWarnings({ "unchecked", "rawtypes" })
TypeInformation<WindowEvent<OUT>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
@@ -346,30 +334,27 @@ public class WindowedDataStream<OUT> {
}
private int getDiscretizerParallelism() {
- return isLocal || (discretizerKey != null) ? dataStream.environment
- .getDegreeOfParallelism() : 1;
+ return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
+ || (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
}
- private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer(TriggerPolicy<OUT> trigger,
- EvictionPolicy<OUT> eviction, KeySelector<OUT, ?> discretizerKey) {
-
+ private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
if (discretizerKey == null) {
- return new StreamDiscretizer<OUT>(trigger, eviction);
- } else if (trigger instanceof TimeTriggerPolicy
- && ((TimeTriggerPolicy<OUT>) trigger).timestampWrapper.isDefaultTimestamp()) {
+ return new StreamDiscretizer<OUT>(getTrigger(), getEviction());
+ } else if (WindowUtils.isSystemTimeTrigger(getTrigger())) {
return new GroupedTimeDiscretizer<OUT>(discretizerKey,
- (TimeTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
- }
-
- else {
+ (TimeTriggerPolicy<OUT>) getTrigger(),
+ (CloneableEvictionPolicy<OUT>) getEviction());
+ } else {
return new GroupedStreamDiscretizer<OUT>(discretizerKey,
- (CloneableTriggerPolicy<OUT>) trigger, (CloneableEvictionPolicy<OUT>) eviction);
+ (CloneableTriggerPolicy<OUT>) getTrigger(),
+ (CloneableEvictionPolicy<OUT>) getEviction());
}
}
private StreamInvokable<WindowEvent<OUT>, StreamWindow<OUT>> getBufferInvokable(
- WindowBuffer<OUT> windowBuffer, KeySelector<OUT, ?> discretizerKey) {
+ WindowBuffer<OUT> windowBuffer) {
if (discretizerKey == null) {
return new WindowBufferInvokable<OUT>(windowBuffer);
} else {
@@ -378,20 +363,18 @@ public class WindowedDataStream<OUT> {
}
@SuppressWarnings("unchecked")
- private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation,
- TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction,
- KeySelector<OUT, ?> discretizerKey) {
+ private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
if (transformation == WindowTransformation.REDUCEWINDOW
- && eviction instanceof TumblingEvictionPolicy) {
+ && getEviction() instanceof TumblingEvictionPolicy) {
if (groupByKey == null) {
return new TumblingPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.UDF), getType()
+ clean((ReduceFunction<OUT>) transformation.getUDF()), getType()
.createSerializer(getExecutionConfig()));
} else {
return new TumblingGroupedPreReducer<OUT>(
- clean((ReduceFunction<OUT>) transformation.UDF), groupByKey,
- getType().createSerializer(getExecutionConfig()));
+ clean((ReduceFunction<OUT>) transformation.getUDF()), groupByKey, getType()
+ .createSerializer(getExecutionConfig()));
}
}
return new BasicWindowBuffer<OUT>();
@@ -678,15 +661,4 @@ public class WindowedDataStream<OUT> {
protected WindowedDataStream<OUT> copy() {
return new WindowedDataStream<OUT>(this);
}
-
- protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(StreamWindow<R> value) throws Exception {
- return value.windowID;
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index 75f7d9d..72be823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -52,10 +52,13 @@ public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>,
protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
throws Exception {
if (windowEvent.isElement()) {
+ System.out.println("element: " + windowEvent.getElement());
buffer.store(windowEvent.getElement());
} else if (windowEvent.isEviction()) {
+ System.out.println("eviction: " + windowEvent.getEviction());
buffer.evict(windowEvent.getEviction());
} else if (windowEvent.isTrigger()) {
+ System.out.println("trigger");
buffer.emitWindow(collector);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
new file mode 100644
index 0000000..1e3ba86
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+
+public class WindowUtils {
+
+ public enum WindowTransformation {
+ REDUCEWINDOW, MAPWINDOW, NONE;
+ private Function UDF;
+
+ public WindowTransformation with(Function UDF) {
+ this.UDF = UDF;
+ return this;
+ }
+
+ public Function getUDF() {
+ return UDF;
+ }
+ }
+
+ public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
+ || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
+ }
+
+ public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
+ return trigger instanceof TimeTriggerPolicy && eviction instanceof TimeEvictionPolicy;
+ }
+
+ public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
+ return trigger instanceof TimeTriggerPolicy
+ && ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
+ }
+
+ public static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(StreamWindow<R> value) throws Exception {
+ return value.windowID;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2be00ac7/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 2ed0002..25b5f87 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -89,17 +89,6 @@ public class WindowIntegrationTest implements Serializable {
inputs.add(11);
inputs.add(11);
- StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
-
- DataStream<Integer> source = env.fromCollection(inputs);
-
- source.window(Count.of(2)).every(Count.of(3)).sum(0).getDiscretizedStream()
- .addSink(new CentralSink1());
-
- source.window(Count.of(4)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
- .flatten().addSink(new CentralSink2());
-
- KeySelector<Integer, ?> key = new ModKey(2);
Timestamp<Integer> ts = new Timestamp<Integer>() {
private static final long serialVersionUID = 1L;
@@ -110,38 +99,50 @@ public class WindowIntegrationTest implements Serializable {
}
};
+ StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+
+ DataStream<Integer> source = env.fromCollection(inputs);
+
+ source.window(Time.of(2, ts)).every(Time.of(3, ts)).sum(0).getDiscretizedStream()
+ .addSink(new CentralSink1());
+
+ source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
+ .flatten().addSink(new CentralSink2());
+
+ KeySelector<Integer, ?> key = new ModKey(2);
+
source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new DistributedSink1());
source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
- source.window(Count.of(2)).every(Count.of(3)).min(0).getDiscretizedStream()
+ source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
.addSink(new CentralSink3());
source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
.addSink(new DistributedSink3());
- source.window(Count.of(5)).mapWindow(new IdentityWindowMap()).flatten()
+ source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
.addSink(new DistributedSink4());
env.execute();
- // sum ( Count of 2 slide 3 )
+ // sum ( Time of 2 slide 3 )
List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.add(StreamWindow.fromElements(4));
+ expected1.add(StreamWindow.fromElements(5));
expected1.add(StreamWindow.fromElements(9));
- expected1.add(StreamWindow.fromElements(22));
+ expected1.add(StreamWindow.fromElements(32));
validateOutput(expected1, CentralSink1.windows);
- // Tumbling Count of 4 grouped by mod 2
+ // Tumbling Time of 4 grouped by mod 2
List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.add(StreamWindow.fromElements(2, 2));
+ expected2.add(StreamWindow.fromElements(2, 2, 4));
expected2.add(StreamWindow.fromElements(1, 3));
- expected2.add(StreamWindow.fromElements(4, 10));
- expected2.add(StreamWindow.fromElements(5, 11));
- expected2.add(StreamWindow.fromElements(11));
+ expected2.add(StreamWindow.fromElements(5));
+ expected2.add(StreamWindow.fromElements(10));
+ expected2.add(StreamWindow.fromElements(11, 11));
validateOutput(expected2, CentralSink2.windows);
@@ -167,11 +168,11 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected4, DistributedSink2.windows);
- // min ( Count of 2 slide 3 )
+ // min ( Time of 2 slide 3 )
List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
- expected5.add(StreamWindow.fromElements(2));
+ expected5.add(StreamWindow.fromElements(1));
expected5.add(StreamWindow.fromElements(4));
- expected5.add(StreamWindow.fromElements(11));
+ expected5.add(StreamWindow.fromElements(10));
validateOutput(expected5, CentralSink3.windows);
@@ -186,8 +187,9 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected6, DistributedSink3.windows);
List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
- expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4));
- expected7.add(StreamWindow.fromElements(5, 10, 11, 11));
+ expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+ expected7.add(StreamWindow.fromElements(10));
+ expected7.add(StreamWindow.fromElements(10, 11, 11));
validateOutput(expected7, DistributedSink4.windows);