You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:36 UTC

[31/34] incubator-flink git commit: [streaming] Introduced central eviction policies in the grouped window invokable

[streaming] Introduced central eviction policies in the grouped window invokable

[streaming] Extended test cases for grouped window invokable to cover usage of central eviction policies


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b082af06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b082af06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b082af06

Branch: refs/heads/master
Commit: b082af06549f639d910173cac9589d23778cf350
Parents: 240e5f1
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Tue Nov 25 14:55:52 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:10 2014 +0100

----------------------------------------------------------------------
 .../operator/GroupedWindowInvokable.java        | 223 ++++++++++++++++---
 .../api/invokable/operator/WindowInvokable.java |  12 +
 .../operator/GroupedWindowInvokableTest.java    | 202 ++++++++++++++---
 3 files changed, 382 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b082af06/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 844a488..29e154b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.invokable.operator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -28,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -43,21 +45,21 @@ import org.slf4j.LoggerFactory;
  * versions. It is additionally aware of the creation of windows per group.
  * 
  * A {@link KeySelector} is used to specify the key position or key extraction.
- * The {@link ReduceFunction} will be executed on each group separately. Trigger
- * policies might either be centralized or distributed. Eviction policies are
- * always distributed. A distributed policy have to be a
- * {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it will
- * be cloned to have separated instances for each group. At the startup time the
- * distributed policies will be stored as sample, and only clones of them will
- * be used to maintain the groups. Therefore, each group starts with the initial
- * policy states.
+ * The {@link ReduceFunction} will be executed on each group separately.
+ * Policies might either be centralized or distributed. It is not possible to
+ * use central and distributed eviction policies at the same time. A distributed
+ * policy have to be a {@link CloneableTriggerPolicy} or
+ * {@link CloneableEvictionPolicy} as it will be cloned to have separated
+ * instances for each group. At the startup time the distributed policies will
+ * be stored as sample, and only clones of them will be used to maintain the
+ * groups. Therefore, each group starts with the initial policy states.
  * 
  * While a distributed policy only gets notified with the elements belonging to
  * the respective group, a centralized policy get notified with all arriving
  * elements. When a centralized trigger occurred, all groups get triggered. This
  * is done by submitting the element which caused the trigger as real element to
  * the groups it belongs to and as fake element to all other groups. Within the
- * groups the element might be further processed, causing more triggered,
+ * groups the element might be further processed, causing more triggers,
  * prenotifications of active distributed policies and evictions like usual.
  * 
  * Central policies can be instance of {@link ActiveTriggerPolicy} and also
@@ -83,23 +85,28 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	private Configuration parameters;
 	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
 	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+	private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies;
+	private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
 	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
 	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
 	private Map<Object, WindowInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>();
 	private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
 	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-
+	private LinkedList<WindowInvokable<IN, OUT>> deleteOrderForCentralEviction;
+	
 	/**
 	 * This constructor creates an instance of the grouped windowing invokable.
+	 * 
 	 * A {@link KeySelector} is used to specify the key position or key
 	 * extraction. The {@link ReduceFunction} will be executed on each group
-	 * separately. Trigger policies might either be centralized or distributed.
-	 * Eviction policies are always distributed. A distributed policy have to be
-	 * a {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it
-	 * will be cloned to have separated instances for each group. At the startup
-	 * time the distributed policies will be stored as sample, and only clones
-	 * of them will be used to maintain the groups. Therefore, each group starts
-	 * with the initial policy states.
+	 * separately. Policies might either be centralized or distributed. It is
+	 * not possible to use central and distributed eviction policies at the same
+	 * time. A distributed policy have to be a {@link CloneableTriggerPolicy} or
+	 * {@link CloneableEvictionPolicy} as it will be cloned to have separated
+	 * instances for each group. At the startup time the distributed policies
+	 * will be stored as sample, and only clones of them will be used to
+	 * maintain the groups. Therefore, each group starts with the initial policy
+	 * states.
 	 * 
 	 * While a distributed policy only gets notified with the elements belonging
 	 * to the respective group, a centralized policy get notified with all
@@ -107,7 +114,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	 * triggered. This is done by submitting the element which caused the
 	 * trigger as real element to the groups it belongs to and as fake element
 	 * to all other groups. Within the groups the element might be further
-	 * processed, causing more triggered, prenotifications of active distributed
+	 * processed, causing more triggers, prenotifications of active distributed
 	 * policies and evictions like usual.
 	 * 
 	 * Central policies can be instance of {@link ActiveTriggerPolicy} and also
@@ -127,33 +134,88 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	 *            within each group.
 	 * @param distributedEvictionPolicies
 	 *            Eviction policies to be distributed and maintained
-	 *            individually within each group. There are no central eviction
-	 *            policies because there is no central element buffer but only a
-	 *            buffer per group. Therefore evictions might always be done per
-	 *            group.
+	 *            individually within each group. Note that there cannot be
+	 *            both, central and distributed eviction policies at the same
+	 *            time.
 	 * @param centralTriggerPolicies
 	 *            Trigger policies which will only exist once at a central
 	 *            place. In case a central policy triggers, it will cause all
 	 *            groups to be emitted. (Remark: Empty groups cannot be emitted.
 	 *            If only one element is contained a group, this element itself
 	 *            is returned as aggregated result.)
+	 * @param centralEvictionPolicies
+	 *            Eviction which will only exist once at a central place. Note
+	 *            that there cannot be both, central and distributed eviction
+	 *            policies at the same time. The central eviction policy will
+	 *            work on an simulated element buffer containing all elements no
+	 *            matter which group they belong to.
 	 */
 	public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
 			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
 			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
-			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
+			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies,
+			LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) {
 
 		super(userFunction);
+
+		// check that not both, central and distributed eviction, is used at the
+		// same time.
+		if (centralEvictionPolicies != null && distributedEvictionPolicies != null
+				&& !centralEvictionPolicies.isEmpty() && !distributedEvictionPolicies.isEmpty()) {
+			throw new UnsupportedOperationException(
+					"You can only use either central or distributed eviction policies but not both at the same time.");
+		}
+
+		// Check that there is at least one trigger and one eviction policy
+		if ((centralEvictionPolicies == null || centralEvictionPolicies.isEmpty())
+				&& (distributedEvictionPolicies == null || distributedEvictionPolicies.isEmpty())) {
+			throw new UnsupportedOperationException(
+					"You have to define at least one eviction policy");
+		}
+		if ((centralTriggerPolicies == null || centralTriggerPolicies.isEmpty())
+				&& (distributedTriggerPolicies == null || distributedTriggerPolicies.isEmpty())) {
+			throw new UnsupportedOperationException(
+					"You have to define at least one trigger policy");
+		}
+
 		this.keySelector = keySelector;
+
+		// handle the triggers
 		this.centralTriggerPolicies = centralTriggerPolicies;
 		this.distributedTriggerPolicies = distributedTriggerPolicies;
-		this.distributedEvictionPolicies = distributedEvictionPolicies;
-
 		for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
 			if (trigger instanceof ActiveTriggerPolicy) {
 				this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
 			}
 		}
+
+		// handle the evictions
+		if (distributedEvictionPolicies != null && !distributedEvictionPolicies.isEmpty()) {
+			this.distributedEvictionPolicies = distributedEvictionPolicies;
+		} else { // (centralEvictionPolicies!=null&&!centralEvictionPolicies.isEmpty())
+			this.centralEvictionPolicies = centralEvictionPolicies;
+			this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+			for (EvictionPolicy<IN> eviction : centralEvictionPolicies) {
+				if (eviction instanceof ActiveEvictionPolicy) {
+					this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
+				}
+			}
+			this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>();
+		}
+
+	}
+	
+	/**
+	 * Same as
+	 * {@link GroupedWindowInvokable#GroupedWindowInvokable(Function, KeySelector, LinkedList, LinkedList, LinkedList, LinkedList)}
+	 * but using always distributed eviction only.
+	 */
+	public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
+			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
+			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
+			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
+		this(userFunction, keySelector, distributedTriggerPolicies, distributedEvictionPolicies,
+				centralTriggerPolicies, null);
 	}
 
 	@Override
@@ -165,8 +227,8 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 		// Continuously run
 		while (reuse != null) {
-			WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
-					.getKey(reuse.getObject()));
+			WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
+					.getObject()));
 			if (groupInvokable == null) {
 				groupInvokable = makeNewGroup(reuse);
 			}
@@ -175,6 +237,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
 				Object[] result = trigger.preNotifyTrigger(reuse.getObject());
 				for (Object in : result) {
+
+					// If central eviction is used, handle it here
+					if (activeCentralEvictionPolicies!=null) {
+						evictElements(centralActiveEviction(in));
+					}
+
+					// process in groups
 					for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
 						group.processFakeElement(in, trigger);
 					}
@@ -189,9 +258,18 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 			}
 
 			if (currentTriggerPolicies.isEmpty()) {
+
 				// only add the element to its group
 				groupInvokable.processRealElement(reuse.getObject());
+
+				// If central eviction is used, handle it here
+				if (centralEvictionPolicies!=null) {
+					evictElements(centralEviction(reuse.getObject(), false));
+					deleteOrderForCentralEviction.add(groupInvokable);
+				}
+
 			} else {
+
 				// call user function for all groups
 				for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
 					if (group == groupInvokable) {
@@ -203,6 +281,12 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 						group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
 					}
 				}
+
+				// If central eviction is used, handle it here
+				if (centralEvictionPolicies!=null) {
+					evictElements(centralEviction(reuse.getObject(), true));
+					deleteOrderForCentralEviction.add(groupInvokable);
+				}
 			}
 
 			// clear current trigger list
@@ -228,8 +312,8 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	/**
 	 * This method creates a new group. The method gets called in case an
 	 * element arrives which has a key which was not seen before. The method
-	 * created a nested {@link WindowInvokable} and therefore created clones
-	 * of all distributed trigger and eviction policies.
+	 * created a nested {@link WindowInvokable} and therefore created clones of
+	 * all distributed trigger and eviction policies.
 	 * 
 	 * @param element
 	 *            The element which leads to the generation of a new group
@@ -299,6 +383,80 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	};
 
 	/**
+	 * This method is used to notify central eviction policies with a real
+	 * element.
+	 * 
+	 * @param input
+	 *            the real element to notify the eviction policy.
+	 * @param triggered
+	 *            whether a central trigger occurred or not.
+	 * @return The number of elements to be deleted from the buffer.
+	 */
+	private int centralEviction(IN input, boolean triggered) {
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (EvictionPolicy<IN> evictionPolicy : centralEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEviction(input, triggered,
+					deleteOrderForCentralEviction.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+		return currentMaxEviction;
+	}
+
+	/**
+	 * This method is used to notify active central eviction policies with a
+	 * fake element.
+	 * 
+	 * @param input
+	 *            the fake element to notify the active central eviction
+	 *            policies.
+	 * @return The number of elements to be deleted from the buffer.
+	 */
+	private int centralActiveEviction(Object input) {
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (ActiveEvictionPolicy<IN> evictionPolicy : activeCentralEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input,
+					deleteOrderForCentralEviction.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+		return currentMaxEviction;
+	}
+
+	/**
+	 * This method is used in central eviction to delete a given number of
+	 * elements from the buffer.
+	 * 
+	 * @param numToEvict
+	 *            number of elements to delete from the virtual central element
+	 *            buffer.
+	 */
+	private void evictElements(int numToEvict) {
+		for (; numToEvict > 0; numToEvict--) {
+			deleteOrderForCentralEviction.getFirst().evictFirst();
+			try {
+				deleteOrderForCentralEviction.removeFirst();
+			} catch (NoSuchElementException e) {
+				// when buffer is empty, ignore exception and stop deleting
+				break;
+			}
+
+		}
+	}
+
+	/**
 	 * This callback class allows to handle the the callbacks done by threads
 	 * defined in active trigger policies
 	 * 
@@ -313,6 +471,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 		@Override
 		public void sendFakeElement(Object datapoint) {
+
+			// If central eviction is used, handle it here
+			if (centralEvictionPolicies!=null) {
+				evictElements(centralActiveEviction(datapoint));
+			}
+
+			// handle element in groups
 			for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
 				group.processFakeElement(datapoint, policy);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b082af06/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index 953759c..740fa4f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -339,6 +339,18 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 		buffer.add(input);
 
 	}
+	
+	/**
+	 * This method removes the first element from the element buffer. It is used
+	 * to provide central evictions in {@link GroupedWindowInvokable}
+	 */
+	protected synchronized void evictFirst() {
+		try {
+			buffer.removeFirst();
+		} catch (NoSuchElementException e) {
+			// ignore exception
+		}
+	}
 
 	/**
 	 * This method does the final reduce at the end of the stream and emits the

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b082af06/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index 9d10166..5ac5529 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -33,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
@@ -44,6 +46,101 @@ import org.junit.Test;
 public class GroupedWindowInvokableTest {
 
 	/**
+	 * Tests that illegal arguments result in failure. The following cases are
+	 * tested: 1) having no trigger 2) having no eviction 3) having neither
+	 * eviction nor trigger 4) having both, central and distributed eviction.
+	 */
+	@Test
+	public void testGroupedWindowInvokableFailTest() {
+
+		// create dummy reduce function
+		ReduceFunction<Object> userFunction = new ReduceFunction<Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Object reduce(Object value1, Object value2) throws Exception {
+				return null;
+			}
+		};
+
+		// create dummy keySelector
+		KeySelector<Object, Object> keySelector = new KeySelector<Object, Object>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Object getKey(Object value) throws Exception {
+				return null;
+			}
+		};
+
+		// create policy lists
+		LinkedList<CloneableEvictionPolicy<Object>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<Object>>();
+		LinkedList<CloneableTriggerPolicy<Object>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<Object>>();
+		LinkedList<EvictionPolicy<Object>> centralEvictionPolicies = new LinkedList<EvictionPolicy<Object>>();
+		LinkedList<TriggerPolicy<Object>> centralTriggerPolicies = new LinkedList<TriggerPolicy<Object>>();
+
+		// empty trigger and policy lists should fail
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (1)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// null for trigger and policy lists should fail
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, null, null, null,
+					null);
+			fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (2)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// empty eviction should still fail
+		centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+		distributedTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance without any eviction policy should cause an UnsupportedOperationException but didn't. (3)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// empty trigger should still fail
+		centralTriggerPolicies.clear();
+		distributedTriggerPolicies.clear();
+		centralEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance without any trigger policy should cause an UnsupportedOperationException but didn't. (4)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+		// having both, central and distributed eviction, at the same time
+		// should fail
+		centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+		distributedEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
+		try {
+			new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+					distributedTriggerPolicies, distributedEvictionPolicies,
+					centralTriggerPolicies, centralEvictionPolicies);
+			fail("Creating instance with central and distributed eviction should cause an UnsupportedOperationException but didn't. (4)");
+		} catch (UnsupportedOperationException e) {
+			// that's the expected case
+		}
+
+	}
+
+	/**
 	 * Test for not active distributed triggers with single field
 	 */
 	@Test
@@ -60,12 +157,23 @@ public class GroupedWindowInvokableTest {
 		inputs.add(1);
 		inputs.add(5);
 
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(15);
-		expected.add(3);
-		expected.add(3);
-		expected.add(15);
-
+		List<Integer> expectedDistributedEviction = new ArrayList<Integer>();
+		expectedDistributedEviction.add(15);
+		expectedDistributedEviction.add(3);
+		expectedDistributedEviction.add(3);
+		expectedDistributedEviction.add(15);
+		
+		List<Integer> expectedCentralEviction = new ArrayList<Integer>();
+		expectedCentralEviction.add(2);
+		expectedCentralEviction.add(5);
+		expectedCentralEviction.add(15);
+		expectedCentralEviction.add(2);
+		expectedCentralEviction.add(5);
+		expectedCentralEviction.add(2);
+		expectedCentralEviction.add(5);
+		expectedCentralEviction.add(1);
+		expectedCentralEviction.add(5);
+		
 		LinkedList<CloneableTriggerPolicy<Integer>> triggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
 		// Trigger on every 2nd element, but the first time after the 3rd
 		triggers.add(new CountTriggerPolicy<Integer>(2, -1));
@@ -77,22 +185,26 @@ public class GroupedWindowInvokableTest {
 
 		LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
 
-		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
-				new ReduceFunction<Integer>() {
-					private static final long serialVersionUID = 1L;
+		ReduceFunction<Integer> reduceFunction=new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
 
-					@Override
-					public Integer reduce(Integer value1, Integer value2) throws Exception {
-						return value1 + value2;
-					}
-				}, new KeySelector<Integer, Integer>() {
-					private static final long serialVersionUID = 1L;
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+		
+		KeySelector<Integer, Integer> keySelector=new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
 
-					@Override
-					public Integer getKey(Integer value) {
-						return value;
-					}
-				}, triggers, evictions, centralTriggers);
+			@Override
+			public Integer getKey(Integer value) {
+				return value;
+			}
+		};
+		
+		GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
+				reduceFunction, keySelector, triggers, evictions, centralTriggers, null);
 
 		List<Integer> result = MockInvokable.createAndExecute(invokable, inputs);
 
@@ -101,8 +213,26 @@ public class GroupedWindowInvokableTest {
 			actual.add(current);
 		}
 
-		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
-		assertEquals(expected.size(), actual.size());
+		assertEquals(new HashSet<Integer>(expectedDistributedEviction), new HashSet<Integer>(actual));
+		assertEquals(expectedDistributedEviction.size(), actual.size());
+		
+		//Run test with central eviction
+		triggers.clear();
+		centralTriggers.add(new CountTriggerPolicy<Integer>(2, -1));
+		LinkedList<EvictionPolicy<Integer>> centralEvictions = new LinkedList<EvictionPolicy<Integer>>();
+		centralEvictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+		
+		invokable = new GroupedWindowInvokable<Integer, Integer>(
+				reduceFunction, keySelector, triggers, null, centralTriggers,centralEvictions);
+		
+		result = MockInvokable.createAndExecute(invokable, inputs);
+		actual = new LinkedList<Integer>();
+		for (Integer current : result) {
+			actual.add(current);
+		}
+
+		assertEquals(new HashSet<Integer>(expectedCentralEviction), new HashSet<Integer>(actual));
+		assertEquals(expectedCentralEviction.size(), actual.size());
 	}
 
 	/**
@@ -150,7 +280,7 @@ public class GroupedWindowInvokableTest {
 						}
 					}
 				}, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions,
-				centralTriggers);
+				centralTriggers, null);
 
 		List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2);
 
@@ -258,7 +388,7 @@ public class GroupedWindowInvokableTest {
 
 		GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
 				myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
-				distributedTriggers, evictions, triggers);
+				distributedTriggers, evictions, triggers, null);
 
 		ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
 		for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
@@ -268,6 +398,26 @@ public class GroupedWindowInvokableTest {
 		assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
 				new HashSet<Tuple2<Integer, String>>(result));
 		assertEquals(expected.size(), result.size());
+		
+		// repeat the test with central eviction. The result should be the same.
+		triggers.clear();
+		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
+		evictions.clear();
+		LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
+		centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp));
+
+		invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
+				myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
+				distributedTriggers, evictions, triggers, centralEvictions);
+
+		result = new ArrayList<Tuple2<Integer, String>>();
+		for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t);
+		}
+
+		assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(result));
+		assertEquals(expected.size(), result.size());
 	}
 
 	/**
@@ -327,7 +477,7 @@ public class GroupedWindowInvokableTest {
 					public Integer getKey(Integer value) {
 						return value;
 					}
-				}, distributedTriggers, evictions, triggers);
+				}, distributedTriggers, evictions, triggers, null);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
 		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
@@ -403,7 +553,7 @@ public class GroupedWindowInvokableTest {
 					public Integer getKey(Integer value) {
 						return value;
 					}
-				}, distributedTriggers, evictions, triggers);
+				}, distributedTriggers, evictions, triggers, null);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
 		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {