You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:35 UTC
[30/34] incubator-flink git commit: [streaming] Central
trigger/eviction API integration + invokable refactor
[streaming] Central trigger/eviction API integration + invokable refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3f74e059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3f74e059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3f74e059
Branch: refs/heads/master
Commit: 3f74e0597e62e0392dae337ca26d107538544513
Parents: b082af0
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Nov 28 17:25:31 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:10 2014 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedDataStream.java | 58 +++++++---
.../operator/GroupedWindowInvokable.java | 116 ++++++++++---------
.../api/invokable/operator/WindowInvokable.java | 24 ++--
.../windowing/TimeWindowingExample.java | 31 ++---
4 files changed, 136 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 622c03d..15e5110 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -55,6 +55,7 @@ public class WindowedDataStream<OUT> {
protected DataStream<OUT> dataStream;
protected boolean isGrouped;
+ protected boolean allCentral;
protected KeySelector<OUT, ?> keySelector;
protected List<WindowingHelper<OUT>> triggerHelpers;
@@ -73,9 +74,13 @@ public class WindowedDataStream<OUT> {
if (dataStream instanceof GroupedDataStream) {
this.isGrouped = true;
this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+ // set all policies distributed
+ this.allCentral = false;
} else {
this.isGrouped = false;
+ // set all policies central
+ this.allCentral = true;
}
}
@@ -96,9 +101,13 @@ public class WindowedDataStream<OUT> {
if (dataStream instanceof GroupedDataStream) {
this.isGrouped = true;
this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+ // set all policies distributed
+ this.allCentral = false;
} else {
this.isGrouped = false;
+ // set all policies central
+ this.allCentral = true;
}
}
@@ -110,6 +119,7 @@ public class WindowedDataStream<OUT> {
this.evictionHelpers = windowedDataStream.evictionHelpers;
this.userTriggers = windowedDataStream.userTriggers;
this.userEvicters = windowedDataStream.userEvicters;
+ this.allCentral = windowedDataStream.allCentral;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -475,22 +485,27 @@ public class WindowedDataStream<OUT> {
private LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
- // Add Time triggers to central triggers
- for (TriggerPolicy<OUT> trigger : getTriggers()) {
- if (trigger instanceof TimeTriggerPolicy) {
- cTriggers.add(trigger);
+ if (allCentral) {
+ cTriggers.addAll(getTriggers());
+ } else {
+ for (TriggerPolicy<OUT> trigger : getTriggers()) {
+ if (trigger instanceof TimeTriggerPolicy) {
+ cTriggers.add(trigger);
+ }
}
}
return cTriggers;
}
private LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
- LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
-
- // Everything except Time triggers are distributed
- for (TriggerPolicy<OUT> trigger : getTriggers()) {
- if (!(trigger instanceof TimeTriggerPolicy)) {
- dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+ LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = null;
+
+ if (!allCentral) {
+ dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
+ for (TriggerPolicy<OUT> trigger : getTriggers()) {
+ if (!(trigger instanceof TimeTriggerPolicy)) {
+ dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+ }
}
}
@@ -498,20 +513,32 @@ public class WindowedDataStream<OUT> {
}
private LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
- LinkedList<CloneableEvictionPolicy<OUT>> evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
+ LinkedList<CloneableEvictionPolicy<OUT>> evicters = null;
- for (EvictionPolicy<OUT> evicter : getEvicters()) {
- evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+ if (!allCentral) {
+ evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
+ for (EvictionPolicy<OUT> evicter : getEvicters()) {
+ evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+ }
}
return evicters;
}
+ private LinkedList<EvictionPolicy<OUT>> getCentralEvicters() {
+ if (allCentral) {
+ return getEvicters();
+ } else {
+ return null;
+ }
+ }
+
private <R> StreamInvokable<OUT, R> getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) {
StreamInvokable<OUT, R> invokable;
if (isGrouped) {
invokable = new GroupedWindowInvokable<OUT, R>(reducer, keySelector,
- getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+ getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
+ getCentralEvicters());
} else {
invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, getTriggers(),
@@ -524,7 +551,8 @@ public class WindowedDataStream<OUT> {
StreamInvokable<OUT, OUT> invokable;
if (isGrouped) {
invokable = new GroupedWindowInvokable<OUT, OUT>(reducer, keySelector,
- getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+ getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
+ getCentralEvicters());
} else {
invokable = new WindowReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 29e154b..905b45f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -83,17 +83,17 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private KeySelector<IN, ?> keySelector;
private Configuration parameters;
- private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
- private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+ private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies;
+ private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies;
private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies;
private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
- private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
- private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
- private Map<Object, WindowInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>();
- private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
- private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+ private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies;
+ private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies;
+ private Map<Object, WindowInvokable<IN, OUT>> windowingGroups;
+ private LinkedList<Thread> activePolicyThreads;
+ private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
private LinkedList<WindowInvokable<IN, OUT>> deleteOrderForCentralEviction;
-
+
/**
* This constructor creates an instance of the grouped windowing invokable.
*
@@ -158,64 +158,70 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
super(userFunction);
- // check that not both, central and distributed eviction, is used at the
- // same time.
- if (centralEvictionPolicies != null && distributedEvictionPolicies != null
- && !centralEvictionPolicies.isEmpty() && !distributedEvictionPolicies.isEmpty()) {
- throw new UnsupportedOperationException(
- "You can only use either central or distributed eviction policies but not both at the same time.");
- }
-
- // Check that there is at least one trigger and one eviction policy
- if ((centralEvictionPolicies == null || centralEvictionPolicies.isEmpty())
- && (distributedEvictionPolicies == null || distributedEvictionPolicies.isEmpty())) {
- throw new UnsupportedOperationException(
- "You have to define at least one eviction policy");
- }
- if ((centralTriggerPolicies == null || centralTriggerPolicies.isEmpty())
- && (distributedTriggerPolicies == null || distributedTriggerPolicies.isEmpty())) {
- throw new UnsupportedOperationException(
- "You have to define at least one trigger policy");
- }
-
this.keySelector = keySelector;
// handle the triggers
- this.centralTriggerPolicies = centralTriggerPolicies;
- this.distributedTriggerPolicies = distributedTriggerPolicies;
- for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
- if (trigger instanceof ActiveTriggerPolicy) {
- this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
+ if (centralTriggerPolicies != null) {
+ this.centralTriggerPolicies = centralTriggerPolicies;
+ this.activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
+
+ for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
+ if (trigger instanceof ActiveTriggerPolicy) {
+ this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
+ }
}
+ } else {
+ this.centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
}
- // handle the evictions
- if (distributedEvictionPolicies != null && !distributedEvictionPolicies.isEmpty()) {
+ if (distributedTriggerPolicies != null) {
+ this.distributedTriggerPolicies = distributedTriggerPolicies;
+ } else {
+ this.distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
+ }
+
+ if (distributedEvictionPolicies != null) {
this.distributedEvictionPolicies = distributedEvictionPolicies;
- } else { // (centralEvictionPolicies!=null&&!centralEvictionPolicies.isEmpty())
+ } else {
+ this.distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
+ }
+
+ this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+
+ if (centralEvictionPolicies != null) {
this.centralEvictionPolicies = centralEvictionPolicies;
- this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+
for (EvictionPolicy<IN> eviction : centralEvictionPolicies) {
if (eviction instanceof ActiveEvictionPolicy) {
this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
}
}
- this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>();
+ } else {
+ this.centralEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
+ }
+
+ this.windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>();
+ this.activePolicyThreads = new LinkedList<Thread>();
+ this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+ this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>();
+
+ // check that not both, central and distributed eviction, is used at the
+ // same time.
+ if (!this.centralEvictionPolicies.isEmpty() && !this.distributedEvictionPolicies.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "You can only use either central or distributed eviction policies but not both at the same time.");
+ }
+
+ // Check that there is at least one trigger and one eviction policy
+ if (this.centralEvictionPolicies.isEmpty() && this.distributedEvictionPolicies.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "You have to define at least one eviction policy");
+ }
+ if (this.centralTriggerPolicies.isEmpty() && this.distributedTriggerPolicies.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "You have to define at least one trigger policy");
}
- }
-
- /**
- * Same as
- * {@link GroupedWindowInvokable#GroupedWindowInvokable(Function, KeySelector, LinkedList, LinkedList, LinkedList, LinkedList)}
- * but using always distributed eviction only.
- */
- public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
- LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
- LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
- LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
- this(userFunction, keySelector, distributedTriggerPolicies, distributedEvictionPolicies,
- centralTriggerPolicies, null);
}
@Override
@@ -239,7 +245,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
for (Object in : result) {
// If central eviction is used, handle it here
- if (activeCentralEvictionPolicies!=null) {
+ if (!activeCentralEvictionPolicies.isEmpty()) {
evictElements(centralActiveEviction(in));
}
@@ -263,7 +269,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
groupInvokable.processRealElement(reuse.getObject());
// If central eviction is used, handle it here
- if (centralEvictionPolicies!=null) {
+ if (!centralEvictionPolicies.isEmpty()) {
evictElements(centralEviction(reuse.getObject(), false));
deleteOrderForCentralEviction.add(groupInvokable);
}
@@ -283,7 +289,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
// If central eviction is used, handle it here
- if (centralEvictionPolicies!=null) {
+ if (!centralEvictionPolicies.isEmpty()) {
evictElements(centralEviction(reuse.getObject(), true));
deleteOrderForCentralEviction.add(groupInvokable);
}
@@ -473,7 +479,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
public void sendFakeElement(Object datapoint) {
// If central eviction is used, handle it here
- if (centralEvictionPolicies!=null) {
+ if (!centralEvictionPolicies.isEmpty()) {
evictElements(centralActiveEviction(datapoint));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index 740fa4f..bbc1277 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -45,9 +45,9 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
private LinkedList<EvictionPolicy<IN>> evictionPolicies;
private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
- private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
- protected LinkedList<IN> buffer = new LinkedList<IN>();
- private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+ private LinkedList<Thread> activePolicyTreads;
+ protected LinkedList<IN> buffer;
+ private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
/**
* This constructor created a windowing invokable using trigger and eviction
@@ -82,6 +82,10 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
}
}
+
+ this.activePolicyTreads = new LinkedList<Thread>();
+ this.buffer = new LinkedList<IN>();
+ this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
}
@Override
@@ -156,12 +160,12 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
* of this group.
*
* Remark: This is NOT the same as
- * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}!
- * Here the eviction using active policies takes place after the call to the
- * UDF. Usually it is done before when fake elements get submitted. This
- * special behaviour is needed to allow the
- * {@link GroupedWindowInvokable} to send central triggers to all groups,
- * even if the current element does not belong to the group.
+ * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}! Here
+ * the eviction using active policies takes place after the call to the UDF.
+ * Usually it is done before when fake elements get submitted. This special
+ * behaviour is needed to allow the {@link GroupedWindowInvokable} to send
+ * central triggers to all groups, even if the current element does not
+ * belong to the group.
*
* @param input
* a fake input element
@@ -339,7 +343,7 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
buffer.add(input);
}
-
+
/**
* This method removes the first element from the element buffer. It is used
* to provide central evictions in {@link GroupedWindowInvokable}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index fba73be..128d3bf 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -36,7 +36,7 @@ import org.apache.flink.util.Collector;
*/
public class TimeWindowingExample {
- private static final int PARALLELISM = 2;
+ private static final int PARALLELISM = 1;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
@@ -45,21 +45,26 @@ public class TimeWindowingExample {
// Prevent output from being blocked
env.setBufferTimeout(100);
- DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
- .groupBy(new KeySelector<Integer, Integer>() {
+ KeySelector<Integer, Integer> myKey = new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(Integer value) throws Exception {
- if (value < 3) {
- return 0;
- } else {
- return 1;
- }
- }
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ if (value < 2) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
- }).window(Count.of(100)).every(Time.of(1000, TimeUnit.MILLISECONDS)).sum();
+ };
+
+ DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
+ .window(Count.of(100))
+ .every(Time.of(1000, TimeUnit.MILLISECONDS))
+ .groupBy(myKey)
+ .sum();
stream.print();