You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:35 UTC

[30/34] incubator-flink git commit: [streaming] Central trigger/eviction API integration + invokable refactor

[streaming] Central trigger/eviction API integration + invokable refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3f74e059
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3f74e059
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3f74e059

Branch: refs/heads/master
Commit: 3f74e0597e62e0392dae337ca26d107538544513
Parents: b082af0
Author: Gyula Fora <gy...@apache.org>
Authored: Fri Nov 28 17:25:31 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:10 2014 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedDataStream.java      |  58 +++++++---
 .../operator/GroupedWindowInvokable.java        | 116 ++++++++++---------
 .../api/invokable/operator/WindowInvokable.java |  24 ++--
 .../windowing/TimeWindowingExample.java         |  31 ++---
 4 files changed, 136 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 622c03d..15e5110 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -55,6 +55,7 @@ public class WindowedDataStream<OUT> {
 
 	protected DataStream<OUT> dataStream;
 	protected boolean isGrouped;
+	protected boolean allCentral;
 	protected KeySelector<OUT, ?> keySelector;
 
 	protected List<WindowingHelper<OUT>> triggerHelpers;
@@ -73,9 +74,13 @@ public class WindowedDataStream<OUT> {
 		if (dataStream instanceof GroupedDataStream) {
 			this.isGrouped = true;
 			this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+			// set all policies distributed
+			this.allCentral = false;
 
 		} else {
 			this.isGrouped = false;
+			// set all policies central
+			this.allCentral = true;
 		}
 	}
 
@@ -96,9 +101,13 @@ public class WindowedDataStream<OUT> {
 		if (dataStream instanceof GroupedDataStream) {
 			this.isGrouped = true;
 			this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+			// set all policies distributed
+			this.allCentral = false;
 
 		} else {
 			this.isGrouped = false;
+			// set all policies central
+			this.allCentral = true;
 		}
 	}
 
@@ -110,6 +119,7 @@ public class WindowedDataStream<OUT> {
 		this.evictionHelpers = windowedDataStream.evictionHelpers;
 		this.userTriggers = windowedDataStream.userTriggers;
 		this.userEvicters = windowedDataStream.userEvicters;
+		this.allCentral = windowedDataStream.allCentral;
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -475,22 +485,27 @@ public class WindowedDataStream<OUT> {
 
 	private LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
 		LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
-		// Add Time triggers to central triggers
-		for (TriggerPolicy<OUT> trigger : getTriggers()) {
-			if (trigger instanceof TimeTriggerPolicy) {
-				cTriggers.add(trigger);
+		if (allCentral) {
+			cTriggers.addAll(getTriggers());
+		} else {
+			for (TriggerPolicy<OUT> trigger : getTriggers()) {
+				if (trigger instanceof TimeTriggerPolicy) {
+					cTriggers.add(trigger);
+				}
 			}
 		}
 		return cTriggers;
 	}
 
 	private LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
-		LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
-
-		// Everything except Time triggers are distributed
-		for (TriggerPolicy<OUT> trigger : getTriggers()) {
-			if (!(trigger instanceof TimeTriggerPolicy)) {
-				dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+		LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = null;
+
+		if (!allCentral) {
+			dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
+			for (TriggerPolicy<OUT> trigger : getTriggers()) {
+				if (!(trigger instanceof TimeTriggerPolicy)) {
+					dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+				}
 			}
 		}
 
@@ -498,20 +513,32 @@ public class WindowedDataStream<OUT> {
 	}
 
 	private LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
-		LinkedList<CloneableEvictionPolicy<OUT>> evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
+		LinkedList<CloneableEvictionPolicy<OUT>> evicters = null;
 
-		for (EvictionPolicy<OUT> evicter : getEvicters()) {
-			evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+		if (!allCentral) {
+			evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
+			for (EvictionPolicy<OUT> evicter : getEvicters()) {
+				evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+			}
 		}
 
 		return evicters;
 	}
 
+	private LinkedList<EvictionPolicy<OUT>> getCentralEvicters() {
+		if (allCentral) {
+			return getEvicters();
+		} else {
+			return null;
+		}
+	}
+
 	private <R> StreamInvokable<OUT, R> getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) {
 		StreamInvokable<OUT, R> invokable;
 		if (isGrouped) {
 			invokable = new GroupedWindowInvokable<OUT, R>(reducer, keySelector,
-					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
+					getCentralEvicters());
 
 		} else {
 			invokable = new WindowGroupReduceInvokable<OUT, R>(reducer, getTriggers(),
@@ -524,7 +551,8 @@ public class WindowedDataStream<OUT> {
 		StreamInvokable<OUT, OUT> invokable;
 		if (isGrouped) {
 			invokable = new GroupedWindowInvokable<OUT, OUT>(reducer, keySelector,
-					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers());
+					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
+					getCentralEvicters());
 
 		} else {
 			invokable = new WindowReduceInvokable<OUT>(reducer, getTriggers(), getEvicters());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 29e154b..905b45f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -83,17 +83,17 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 	private KeySelector<IN, ?> keySelector;
 	private Configuration parameters;
-	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
-	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies;
+	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies;
 	private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies;
 	private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
-	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
-	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
-	private Map<Object, WindowInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>();
-	private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
-	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies;
+	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies;
+	private Map<Object, WindowInvokable<IN, OUT>> windowingGroups;
+	private LinkedList<Thread> activePolicyThreads;
+	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
 	private LinkedList<WindowInvokable<IN, OUT>> deleteOrderForCentralEviction;
-	
+
 	/**
 	 * This constructor creates an instance of the grouped windowing invokable.
 	 * 
@@ -158,64 +158,70 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 		super(userFunction);
 
-		// check that not both, central and distributed eviction, is used at the
-		// same time.
-		if (centralEvictionPolicies != null && distributedEvictionPolicies != null
-				&& !centralEvictionPolicies.isEmpty() && !distributedEvictionPolicies.isEmpty()) {
-			throw new UnsupportedOperationException(
-					"You can only use either central or distributed eviction policies but not both at the same time.");
-		}
-
-		// Check that there is at least one trigger and one eviction policy
-		if ((centralEvictionPolicies == null || centralEvictionPolicies.isEmpty())
-				&& (distributedEvictionPolicies == null || distributedEvictionPolicies.isEmpty())) {
-			throw new UnsupportedOperationException(
-					"You have to define at least one eviction policy");
-		}
-		if ((centralTriggerPolicies == null || centralTriggerPolicies.isEmpty())
-				&& (distributedTriggerPolicies == null || distributedTriggerPolicies.isEmpty())) {
-			throw new UnsupportedOperationException(
-					"You have to define at least one trigger policy");
-		}
-
 		this.keySelector = keySelector;
 
 		// handle the triggers
-		this.centralTriggerPolicies = centralTriggerPolicies;
-		this.distributedTriggerPolicies = distributedTriggerPolicies;
-		for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
-			if (trigger instanceof ActiveTriggerPolicy) {
-				this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
+		if (centralTriggerPolicies != null) {
+			this.centralTriggerPolicies = centralTriggerPolicies;
+			this.activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
+
+			for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
+				if (trigger instanceof ActiveTriggerPolicy) {
+					this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
+				}
 			}
+		} else {
+			this.centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
 		}
 
-		// handle the evictions
-		if (distributedEvictionPolicies != null && !distributedEvictionPolicies.isEmpty()) {
+		if (distributedTriggerPolicies != null) {
+			this.distributedTriggerPolicies = distributedTriggerPolicies;
+		} else {
+			this.distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
+		}
+
+		if (distributedEvictionPolicies != null) {
 			this.distributedEvictionPolicies = distributedEvictionPolicies;
-		} else { // (centralEvictionPolicies!=null&&!centralEvictionPolicies.isEmpty())
+		} else {
+			this.distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
+		}
+
+		this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+
+		if (centralEvictionPolicies != null) {
 			this.centralEvictionPolicies = centralEvictionPolicies;
-			this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+
 			for (EvictionPolicy<IN> eviction : centralEvictionPolicies) {
 				if (eviction instanceof ActiveEvictionPolicy) {
 					this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
 				}
 			}
-			this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>();
+		} else {
+			this.centralEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
+		}
+
+		this.windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>();
+		this.activePolicyThreads = new LinkedList<Thread>();
+		this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+		this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>();
+
+		// check that not both, central and distributed eviction, is used at the
+		// same time.
+		if (!this.centralEvictionPolicies.isEmpty() && !this.distributedEvictionPolicies.isEmpty()) {
+			throw new UnsupportedOperationException(
+					"You can only use either central or distributed eviction policies but not both at the same time.");
+		}
+
+		// Check that there is at least one trigger and one eviction policy
+		if (this.centralEvictionPolicies.isEmpty() && this.distributedEvictionPolicies.isEmpty()) {
+			throw new UnsupportedOperationException(
+					"You have to define at least one eviction policy");
+		}
+		if (this.centralTriggerPolicies.isEmpty() && this.distributedTriggerPolicies.isEmpty()) {
+			throw new UnsupportedOperationException(
+					"You have to define at least one trigger policy");
 		}
 
-	}
-	
-	/**
-	 * Same as
-	 * {@link GroupedWindowInvokable#GroupedWindowInvokable(Function, KeySelector, LinkedList, LinkedList, LinkedList, LinkedList)}
-	 * but using always distributed eviction only.
-	 */
-	public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
-			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
-			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
-			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
-		this(userFunction, keySelector, distributedTriggerPolicies, distributedEvictionPolicies,
-				centralTriggerPolicies, null);
 	}
 
 	@Override
@@ -239,7 +245,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 				for (Object in : result) {
 
 					// If central eviction is used, handle it here
-					if (activeCentralEvictionPolicies!=null) {
+					if (!activeCentralEvictionPolicies.isEmpty()) {
 						evictElements(centralActiveEviction(in));
 					}
 
@@ -263,7 +269,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 				groupInvokable.processRealElement(reuse.getObject());
 
 				// If central eviction is used, handle it here
-				if (centralEvictionPolicies!=null) {
+				if (!centralEvictionPolicies.isEmpty()) {
 					evictElements(centralEviction(reuse.getObject(), false));
 					deleteOrderForCentralEviction.add(groupInvokable);
 				}
@@ -283,7 +289,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 				}
 
 				// If central eviction is used, handle it here
-				if (centralEvictionPolicies!=null) {
+				if (!centralEvictionPolicies.isEmpty()) {
 					evictElements(centralEviction(reuse.getObject(), true));
 					deleteOrderForCentralEviction.add(groupInvokable);
 				}
@@ -473,7 +479,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 		public void sendFakeElement(Object datapoint) {
 
 			// If central eviction is used, handle it here
-			if (centralEvictionPolicies!=null) {
+			if (!centralEvictionPolicies.isEmpty()) {
 				evictElements(centralActiveEviction(datapoint));
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index 740fa4f..bbc1277 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -45,9 +45,9 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	private LinkedList<EvictionPolicy<IN>> evictionPolicies;
 	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
 	private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
-	private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
-	protected LinkedList<IN> buffer = new LinkedList<IN>();
-	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+	private LinkedList<Thread> activePolicyTreads;
+	protected LinkedList<IN> buffer;
+	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
 
 	/**
 	 * This constructor created a windowing invokable using trigger and eviction
@@ -82,6 +82,10 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 				activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
 			}
 		}
+
+		this.activePolicyTreads = new LinkedList<Thread>();
+		this.buffer = new LinkedList<IN>();
+		this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
 	}
 
 	@Override
@@ -156,12 +160,12 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	 * of this group.
 	 * 
 	 * Remark: This is NOT the same as
-	 * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}!
-	 * Here the eviction using active policies takes place after the call to the
-	 * UDF. Usually it is done before when fake elements get submitted. This
-	 * special behaviour is needed to allow the
-	 * {@link GroupedWindowInvokable} to send central triggers to all groups,
-	 * even if the current element does not belong to the group.
+	 * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}! Here
+	 * the eviction using active policies takes place after the call to the UDF.
+	 * Usually it is done before when fake elements get submitted. This special
+	 * behaviour is needed to allow the {@link GroupedWindowInvokable} to send
+	 * central triggers to all groups, even if the current element does not
+	 * belong to the group.
 	 * 
 	 * @param input
 	 *            a fake input element
@@ -339,7 +343,7 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 		buffer.add(input);
 
 	}
-	
+
 	/**
 	 * This method removes the first element from the element buffer. It is used
 	 * to provide central evictions in {@link GroupedWindowInvokable}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f74e059/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index fba73be..128d3bf 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -36,7 +36,7 @@ import org.apache.flink.util.Collector;
  */
 public class TimeWindowingExample {
 
-	private static final int PARALLELISM = 2;
+	private static final int PARALLELISM = 1;
 
 	public static void main(String[] args) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
@@ -45,21 +45,26 @@ public class TimeWindowingExample {
 		// Prevent output from being blocked
 		env.setBufferTimeout(100);
 
-		DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
-				.groupBy(new KeySelector<Integer, Integer>() {
+		KeySelector<Integer, Integer> myKey = new KeySelector<Integer, Integer>() {
 
-					private static final long serialVersionUID = 1L;
+			private static final long serialVersionUID = 1L;
 
-					@Override
-					public Integer getKey(Integer value) throws Exception {
-						if (value < 3) {
-							return 0;
-						} else {
-							return 1;
-						}
-					}
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				if (value < 2) {
+					return 0;
+				} else {
+					return 1;
+				}
+			}
 
-				}).window(Count.of(100)).every(Time.of(1000, TimeUnit.MILLISECONDS)).sum();
+		};
+
+		DataStream<Integer> stream = env.addSource(new CountingSourceWithSleep())
+				.window(Count.of(100))
+				.every(Time.of(1000, TimeUnit.MILLISECONDS))
+				.groupBy(myKey)
+				.sum();
 
 		stream.print();