You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:36 UTC
[31/34] incubator-flink git commit: [streaming] Introduced central
eviction policies in the grouped window invokable
[streaming] Introduced central eviction policies in the grouped window invokable
[streaming] Extended test cases for grouped window invokable to cover usage of central eviction policies
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b082af06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b082af06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b082af06
Branch: refs/heads/master
Commit: b082af06549f639d910173cac9589d23778cf350
Parents: 240e5f1
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Tue Nov 25 14:55:52 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:10 2014 +0100
----------------------------------------------------------------------
.../operator/GroupedWindowInvokable.java | 223 ++++++++++++++++---
.../api/invokable/operator/WindowInvokable.java | 12 +
.../operator/GroupedWindowInvokableTest.java | 202 ++++++++++++++---
3 files changed, 382 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b082af06/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 844a488..29e154b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.NoSuchElementException;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -28,6 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
@@ -43,21 +45,21 @@ import org.slf4j.LoggerFactory;
* versions. It is additionally aware of the creation of windows per group.
*
* A {@link KeySelector} is used to specify the key position or key extraction.
- * The {@link ReduceFunction} will be executed on each group separately. Trigger
- * policies might either be centralized or distributed. Eviction policies are
- * always distributed. A distributed policy have to be a
- * {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it will
- * be cloned to have separated instances for each group. At the startup time the
- * distributed policies will be stored as sample, and only clones of them will
- * be used to maintain the groups. Therefore, each group starts with the initial
- * policy states.
+ * The {@link ReduceFunction} will be executed on each group separately.
+ * Policies might either be centralized or distributed. It is not possible to
+ * use central and distributed eviction policies at the same time. A distributed
+ * policy have to be a {@link CloneableTriggerPolicy} or
+ * {@link CloneableEvictionPolicy} as it will be cloned to have separated
+ * instances for each group. At the startup time the distributed policies will
+ * be stored as sample, and only clones of them will be used to maintain the
+ * groups. Therefore, each group starts with the initial policy states.
*
* While a distributed policy only gets notified with the elements belonging to
* the respective group, a centralized policy get notified with all arriving
* elements. When a centralized trigger occurred, all groups get triggered. This
* is done by submitting the element which caused the trigger as real element to
* the groups it belongs to and as fake element to all other groups. Within the
- * groups the element might be further processed, causing more triggered,
+ * groups the element might be further processed, causing more triggers,
* prenotifications of active distributed policies and evictions like usual.
*
* Central policies can be instance of {@link ActiveTriggerPolicy} and also
@@ -83,23 +85,28 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private Configuration parameters;
private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+ private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies;
+ private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
private Map<Object, WindowInvokable<IN, OUT>> windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>();
private LinkedList<Thread> activePolicyThreads = new LinkedList<Thread>();
private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-
+ private LinkedList<WindowInvokable<IN, OUT>> deleteOrderForCentralEviction;
+
/**
* This constructor creates an instance of the grouped windowing invokable.
+ *
* A {@link KeySelector} is used to specify the key position or key
* extraction. The {@link ReduceFunction} will be executed on each group
- * separately. Trigger policies might either be centralized or distributed.
- * Eviction policies are always distributed. A distributed policy have to be
- * a {@link CloneableTriggerPolicy} or {@link CloneableEvictionPolicy} as it
- * will be cloned to have separated instances for each group. At the startup
- * time the distributed policies will be stored as sample, and only clones
- * of them will be used to maintain the groups. Therefore, each group starts
- * with the initial policy states.
+ * separately. Policies might either be centralized or distributed. It is
+ * not possible to use central and distributed eviction policies at the same
+ * time. A distributed policy have to be a {@link CloneableTriggerPolicy} or
+ * {@link CloneableEvictionPolicy} as it will be cloned to have separated
+ * instances for each group. At the startup time the distributed policies
+ * will be stored as sample, and only clones of them will be used to
+ * maintain the groups. Therefore, each group starts with the initial policy
+ * states.
*
* While a distributed policy only gets notified with the elements belonging
* to the respective group, a centralized policy get notified with all
@@ -107,7 +114,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
* triggered. This is done by submitting the element which caused the
* trigger as real element to the groups it belongs to and as fake element
* to all other groups. Within the groups the element might be further
- * processed, causing more triggered, prenotifications of active distributed
+ * processed, causing more triggers, prenotifications of active distributed
* policies and evictions like usual.
*
* Central policies can be instance of {@link ActiveTriggerPolicy} and also
@@ -127,33 +134,88 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
* within each group.
* @param distributedEvictionPolicies
* Eviction policies to be distributed and maintained
- * individually within each group. There are no central eviction
- * policies because there is no central element buffer but only a
- * buffer per group. Therefore evictions might always be done per
- * group.
+ * individually within each group. Note that there cannot be
+ * both, central and distributed eviction policies at the same
+ * time.
* @param centralTriggerPolicies
* Trigger policies which will only exist once at a central
* place. In case a central policy triggers, it will cause all
* groups to be emitted. (Remark: Empty groups cannot be emitted.
* If only one element is contained a group, this element itself
* is returned as aggregated result.)
+ * @param centralEvictionPolicies
+ * Eviction which will only exist once at a central place. Note
+ * that there cannot be both, central and distributed eviction
+ * policies at the same time. The central eviction policy will
+ * work on an simulated element buffer containing all elements no
+ * matter which group they belong to.
*/
public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
- LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
+ LinkedList<TriggerPolicy<IN>> centralTriggerPolicies,
+ LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) {
super(userFunction);
+
+ // check that not both, central and distributed eviction, is used at the
+ // same time.
+ if (centralEvictionPolicies != null && distributedEvictionPolicies != null
+ && !centralEvictionPolicies.isEmpty() && !distributedEvictionPolicies.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "You can only use either central or distributed eviction policies but not both at the same time.");
+ }
+
+ // Check that there is at least one trigger and one eviction policy
+ if ((centralEvictionPolicies == null || centralEvictionPolicies.isEmpty())
+ && (distributedEvictionPolicies == null || distributedEvictionPolicies.isEmpty())) {
+ throw new UnsupportedOperationException(
+ "You have to define at least one eviction policy");
+ }
+ if ((centralTriggerPolicies == null || centralTriggerPolicies.isEmpty())
+ && (distributedTriggerPolicies == null || distributedTriggerPolicies.isEmpty())) {
+ throw new UnsupportedOperationException(
+ "You have to define at least one trigger policy");
+ }
+
this.keySelector = keySelector;
+
+ // handle the triggers
this.centralTriggerPolicies = centralTriggerPolicies;
this.distributedTriggerPolicies = distributedTriggerPolicies;
- this.distributedEvictionPolicies = distributedEvictionPolicies;
-
for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
if (trigger instanceof ActiveTriggerPolicy) {
this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
}
}
+
+ // handle the evictions
+ if (distributedEvictionPolicies != null && !distributedEvictionPolicies.isEmpty()) {
+ this.distributedEvictionPolicies = distributedEvictionPolicies;
+ } else { // (centralEvictionPolicies!=null&&!centralEvictionPolicies.isEmpty())
+ this.centralEvictionPolicies = centralEvictionPolicies;
+ this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+ for (EvictionPolicy<IN> eviction : centralEvictionPolicies) {
+ if (eviction instanceof ActiveEvictionPolicy) {
+ this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
+ }
+ }
+ this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>();
+ }
+
+ }
+
+ /**
+ * Same as
+ * {@link GroupedWindowInvokable#GroupedWindowInvokable(Function, KeySelector, LinkedList, LinkedList, LinkedList, LinkedList)}
+ * but using always distributed eviction only.
+ */
+ public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
+ LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
+ LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
+ LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
+ this(userFunction, keySelector, distributedTriggerPolicies, distributedEvictionPolicies,
+ centralTriggerPolicies, null);
}
@Override
@@ -165,8 +227,8 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// Continuously run
while (reuse != null) {
- WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
- .getKey(reuse.getObject()));
+ WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
+ .getObject()));
if (groupInvokable == null) {
groupInvokable = makeNewGroup(reuse);
}
@@ -175,6 +237,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
Object[] result = trigger.preNotifyTrigger(reuse.getObject());
for (Object in : result) {
+
+ // If central eviction is used, handle it here
+ if (activeCentralEvictionPolicies!=null) {
+ evictElements(centralActiveEviction(in));
+ }
+
+ // process in groups
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(in, trigger);
}
@@ -189,9 +258,18 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
if (currentTriggerPolicies.isEmpty()) {
+
// only add the element to its group
groupInvokable.processRealElement(reuse.getObject());
+
+ // If central eviction is used, handle it here
+ if (centralEvictionPolicies!=null) {
+ evictElements(centralEviction(reuse.getObject(), false));
+ deleteOrderForCentralEviction.add(groupInvokable);
+ }
+
} else {
+
// call user function for all groups
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
if (group == groupInvokable) {
@@ -203,6 +281,12 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
}
}
+
+ // If central eviction is used, handle it here
+ if (centralEvictionPolicies!=null) {
+ evictElements(centralEviction(reuse.getObject(), true));
+ deleteOrderForCentralEviction.add(groupInvokable);
+ }
}
// clear current trigger list
@@ -228,8 +312,8 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
/**
* This method creates a new group. The method gets called in case an
* element arrives which has a key which was not seen before. The method
- * created a nested {@link WindowInvokable} and therefore created clones
- * of all distributed trigger and eviction policies.
+ * created a nested {@link WindowInvokable} and therefore created clones of
+ * all distributed trigger and eviction policies.
*
* @param element
* The element which leads to the generation of a new group
@@ -299,6 +383,80 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
};
/**
+ * This method is used to notify central eviction policies with a real
+ * element.
+ *
+ * @param input
+ * the real element to notify the eviction policy.
+ * @param triggered
+ * whether a central trigger occurred or not.
+ * @return The number of elements to be deleted from the buffer.
+ */
+ private int centralEviction(IN input, boolean triggered) {
+ // Process the evictions and take care of double evictions
+ // In case there are multiple eviction policies present,
+ // only the one with the highest return value is recognized.
+ int currentMaxEviction = 0;
+ for (EvictionPolicy<IN> evictionPolicy : centralEvictionPolicies) {
+ // use temporary variable to prevent multiple calls to
+ // notifyEviction
+ int tmp = evictionPolicy.notifyEviction(input, triggered,
+ deleteOrderForCentralEviction.size());
+ if (tmp > currentMaxEviction) {
+ currentMaxEviction = tmp;
+ }
+ }
+ return currentMaxEviction;
+ }
+
+ /**
+ * This method is used to notify active central eviction policies with a
+ * fake element.
+ *
+ * @param input
+ * the fake element to notify the active central eviction
+ * policies.
+ * @return The number of elements to be deleted from the buffer.
+ */
+ private int centralActiveEviction(Object input) {
+ // Process the evictions and take care of double evictions
+ // In case there are multiple eviction policies present,
+ // only the one with the highest return value is recognized.
+ int currentMaxEviction = 0;
+ for (ActiveEvictionPolicy<IN> evictionPolicy : activeCentralEvictionPolicies) {
+ // use temporary variable to prevent multiple calls to
+ // notifyEviction
+ int tmp = evictionPolicy.notifyEvictionWithFakeElement(input,
+ deleteOrderForCentralEviction.size());
+ if (tmp > currentMaxEviction) {
+ currentMaxEviction = tmp;
+ }
+ }
+ return currentMaxEviction;
+ }
+
+ /**
+ * This method is used in central eviction to delete a given number of
+ * elements from the buffer.
+ *
+ * @param numToEvict
+ * number of elements to delete from the virtual central element
+ * buffer.
+ */
+ private void evictElements(int numToEvict) {
+ for (; numToEvict > 0; numToEvict--) {
+ deleteOrderForCentralEviction.getFirst().evictFirst();
+ try {
+ deleteOrderForCentralEviction.removeFirst();
+ } catch (NoSuchElementException e) {
+ // when buffer is empty, ignore exception and stop deleting
+ break;
+ }
+
+ }
+ }
+
+ /**
* This callback class allows to handle the the callbacks done by threads
* defined in active trigger policies
*
@@ -313,6 +471,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
@Override
public void sendFakeElement(Object datapoint) {
+
+ // If central eviction is used, handle it here
+ if (centralEvictionPolicies!=null) {
+ evictElements(centralActiveEviction(datapoint));
+ }
+
+ // handle element in groups
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(datapoint, policy);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b082af06/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index 953759c..740fa4f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -339,6 +339,18 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
buffer.add(input);
}
+
+ /**
+ * This method removes the first element from the element buffer. It is used
+ * to provide central evictions in {@link GroupedWindowInvokable}
+ */
+ protected synchronized void evictFirst() {
+ try {
+ buffer.removeFirst();
+ } catch (NoSuchElementException e) {
+ // ignore exception
+ }
+ }
/**
* This method does the final reduce at the end of the stream and emits the
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b082af06/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index 9d10166..5ac5529 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.invokable.operator;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashSet;
@@ -33,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
@@ -44,6 +46,101 @@ import org.junit.Test;
public class GroupedWindowInvokableTest {
/**
+ * Tests that illegal arguments result in failure. The following cases are
+ * tested: 1) having no trigger 2) having no eviction 3) having neither
+ * eviction nor trigger 4) having both, central and distributed eviction.
+ */
+ @Test
+ public void testGroupedWindowInvokableFailTest() {
+
+ // create dummy reduce function
+ ReduceFunction<Object> userFunction = new ReduceFunction<Object>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object reduce(Object value1, Object value2) throws Exception {
+ return null;
+ }
+ };
+
+ // create dummy keySelector
+ KeySelector<Object, Object> keySelector = new KeySelector<Object, Object>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object getKey(Object value) throws Exception {
+ return null;
+ }
+ };
+
+ // create policy lists
+ LinkedList<CloneableEvictionPolicy<Object>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<Object>>();
+ LinkedList<CloneableTriggerPolicy<Object>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<Object>>();
+ LinkedList<EvictionPolicy<Object>> centralEvictionPolicies = new LinkedList<EvictionPolicy<Object>>();
+ LinkedList<TriggerPolicy<Object>> centralTriggerPolicies = new LinkedList<TriggerPolicy<Object>>();
+
+ // empty trigger and policy lists should fail
+ try {
+ new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+ distributedTriggerPolicies, distributedEvictionPolicies,
+ centralTriggerPolicies, centralEvictionPolicies);
+ fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (1)");
+ } catch (UnsupportedOperationException e) {
+ // that's the expected case
+ }
+
+ // null for trigger and policy lists should fail
+ try {
+ new GroupedWindowInvokable<Object, Object>(userFunction, keySelector, null, null, null,
+ null);
+ fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (2)");
+ } catch (UnsupportedOperationException e) {
+ // that's the expected case
+ }
+
+ // empty eviction should still fail
+ centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+ distributedTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+ try {
+ new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+ distributedTriggerPolicies, distributedEvictionPolicies,
+ centralTriggerPolicies, centralEvictionPolicies);
+ fail("Creating instance without any eviction policy should cause an UnsupportedOperationException but didn't. (3)");
+ } catch (UnsupportedOperationException e) {
+ // that's the expected case
+ }
+
+ // empty trigger should still fail
+ centralTriggerPolicies.clear();
+ distributedTriggerPolicies.clear();
+ centralEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
+ try {
+ new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+ distributedTriggerPolicies, distributedEvictionPolicies,
+ centralTriggerPolicies, centralEvictionPolicies);
+ fail("Creating instance without any trigger policy should cause an UnsupportedOperationException but didn't. (4)");
+ } catch (UnsupportedOperationException e) {
+ // that's the expected case
+ }
+
+ // having both, central and distributed eviction, at the same time
+ // should fail
+ centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
+ distributedEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
+ try {
+ new GroupedWindowInvokable<Object, Object>(userFunction, keySelector,
+ distributedTriggerPolicies, distributedEvictionPolicies,
+ centralTriggerPolicies, centralEvictionPolicies);
+ fail("Creating instance with central and distributed eviction should cause an UnsupportedOperationException but didn't. (4)");
+ } catch (UnsupportedOperationException e) {
+ // that's the expected case
+ }
+
+ }
+
+ /**
* Test for not active distributed triggers with single field
*/
@Test
@@ -60,12 +157,23 @@ public class GroupedWindowInvokableTest {
inputs.add(1);
inputs.add(5);
- List<Integer> expected = new ArrayList<Integer>();
- expected.add(15);
- expected.add(3);
- expected.add(3);
- expected.add(15);
-
+ List<Integer> expectedDistributedEviction = new ArrayList<Integer>();
+ expectedDistributedEviction.add(15);
+ expectedDistributedEviction.add(3);
+ expectedDistributedEviction.add(3);
+ expectedDistributedEviction.add(15);
+
+ List<Integer> expectedCentralEviction = new ArrayList<Integer>();
+ expectedCentralEviction.add(2);
+ expectedCentralEviction.add(5);
+ expectedCentralEviction.add(15);
+ expectedCentralEviction.add(2);
+ expectedCentralEviction.add(5);
+ expectedCentralEviction.add(2);
+ expectedCentralEviction.add(5);
+ expectedCentralEviction.add(1);
+ expectedCentralEviction.add(5);
+
LinkedList<CloneableTriggerPolicy<Integer>> triggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
// Trigger on every 2nd element, but the first time after the 3rd
triggers.add(new CountTriggerPolicy<Integer>(2, -1));
@@ -77,22 +185,26 @@ public class GroupedWindowInvokableTest {
LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
- GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
- new ReduceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
+ ReduceFunction<Integer> reduceFunction=new ReduceFunction<Integer>() {
+ private static final long serialVersionUID = 1L;
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- }, new KeySelector<Integer, Integer>() {
- private static final long serialVersionUID = 1L;
+ @Override
+ public Integer reduce(Integer value1, Integer value2) throws Exception {
+ return value1 + value2;
+ }
+ };
+
+ KeySelector<Integer, Integer> keySelector=new KeySelector<Integer, Integer>() {
+ private static final long serialVersionUID = 1L;
- @Override
- public Integer getKey(Integer value) {
- return value;
- }
- }, triggers, evictions, centralTriggers);
+ @Override
+ public Integer getKey(Integer value) {
+ return value;
+ }
+ };
+
+ GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
+ reduceFunction, keySelector, triggers, evictions, centralTriggers, null);
List<Integer> result = MockInvokable.createAndExecute(invokable, inputs);
@@ -101,8 +213,26 @@ public class GroupedWindowInvokableTest {
actual.add(current);
}
- assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
- assertEquals(expected.size(), actual.size());
+ assertEquals(new HashSet<Integer>(expectedDistributedEviction), new HashSet<Integer>(actual));
+ assertEquals(expectedDistributedEviction.size(), actual.size());
+
+ //Run test with central eviction
+ triggers.clear();
+ centralTriggers.add(new CountTriggerPolicy<Integer>(2, -1));
+ LinkedList<EvictionPolicy<Integer>> centralEvictions = new LinkedList<EvictionPolicy<Integer>>();
+ centralEvictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+
+ invokable = new GroupedWindowInvokable<Integer, Integer>(
+ reduceFunction, keySelector, triggers, null, centralTriggers,centralEvictions);
+
+ result = MockInvokable.createAndExecute(invokable, inputs);
+ actual = new LinkedList<Integer>();
+ for (Integer current : result) {
+ actual.add(current);
+ }
+
+ assertEquals(new HashSet<Integer>(expectedCentralEviction), new HashSet<Integer>(actual));
+ assertEquals(expectedCentralEviction.size(), actual.size());
}
/**
@@ -150,7 +280,7 @@ public class GroupedWindowInvokableTest {
}
}
}, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions,
- centralTriggers);
+ centralTriggers, null);
List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2);
@@ -258,7 +388,7 @@ public class GroupedWindowInvokableTest {
GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
- distributedTriggers, evictions, triggers);
+ distributedTriggers, evictions, triggers, null);
ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
@@ -268,6 +398,26 @@ public class GroupedWindowInvokableTest {
assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
new HashSet<Tuple2<Integer, String>>(result));
assertEquals(expected.size(), result.size());
+
+ // repeat the test with central eviction. The result should be the same.
+ triggers.clear();
+ triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
+ evictions.clear();
+ LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
+ centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp));
+
+ invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
+ myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
+ distributedTriggers, evictions, triggers, centralEvictions);
+
+ result = new ArrayList<Tuple2<Integer, String>>();
+ for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+ result.add(t);
+ }
+
+ assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
+ new HashSet<Tuple2<Integer, String>>(result));
+ assertEquals(expected.size(), result.size());
}
/**
@@ -327,7 +477,7 @@ public class GroupedWindowInvokableTest {
public Integer getKey(Integer value) {
return value;
}
- }, distributedTriggers, evictions, triggers);
+ }, distributedTriggers, evictions, triggers, null);
ArrayList<Integer> result = new ArrayList<Integer>();
for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
@@ -403,7 +553,7 @@ public class GroupedWindowInvokableTest {
public Integer getKey(Integer value) {
return value;
}
- }, distributedTriggers, evictions, triggers);
+ }, distributedTriggers, evictions, triggers, null);
ArrayList<Integer> result = new ArrayList<Integer>();
for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {