You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:11 UTC

[06/34] incubator-flink git commit: [streaming] Created trigger and eviction policies based on element counters. Included respective policy helper and test cases.

[streaming] Created trigger and eviction policies based on element counters. Included respective policy helper and test cases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d731b3b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d731b3b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d731b3b0

Branch: refs/heads/master
Commit: d731b3b06bfb7d61b0464715f22482c34ad88ff6
Parents: 93178ae
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Mon Oct 27 13:22:26 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100

----------------------------------------------------------------------
 .../streaming/api/windowing/helper/Count.java   |  68 +++++++++++
 .../windowing/policy/CountEvictionPolicy.java   | 115 ++++++++++++++++++
 .../windowing/policy/CountTriggerPolicy.java    |  83 +++++++++++++
 .../policy/CountEvictionPolicyTest.java         | 119 +++++++++++++++++++
 .../policy/CountTriggerPolicyTest.java          |  94 +++++++++++++++
 5 files changed, 479 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d731b3b0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
new file mode 100644
index 0000000..b74e952
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * Represents a count based trigger or eviction policy.
+ * Use the {@link Count#of(int)} to get an instance.
+ */
+@SuppressWarnings("rawtypes")
+public class Count implements WindowingHelper {
+
+	private int count;
+
+	/**
+	 * Specifies on which element a trigger or an eviction should happen (based
+	 * on the count of the elements).
+	 * 
+	 * This constructor does exactly the same as {@link Count#of(int)}. 
+	 * 
+	 * @param count the number of elements to count before trigger/evict
+	 */
+	public Count(int count) {
+		this.count = count;
+	}
+
+	@Override
+	public EvictionPolicy<?> toEvict() {
+		return new CountEvictionPolicy(count);
+	}
+
+	@Override
+	public TriggerPolicy<?> toTrigger() {
+		return new CountTriggerPolicy(count);
+	}
+
+	/**
+	 * Specifies on which element a trigger or an eviction should happen (based
+	 * on the count of the elements)
+	 * 
+	 * @param count
+	 *            the number of elements to count before trigger/evict
+	 * @return An helper representing the policy
+	 */
+	public static Count of(int count) {
+		return new Count(count);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d731b3b0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
new file mode 100644
index 0000000..1d8175c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This eviction policy allows the eviction of data points from the buffer using
+ * a counter of arriving elements and a threshold (maximal buffer size)
+ * 
+ * By default this policy does not react on fake elements. Wrap it in an
+ * {@link ActiveEvictionPolicyWrapper} to make it count even fake elements.
+ * 
+ * @param <IN>
+ *            the type of the incoming data points
+ */
+public class CountEvictionPolicy<IN> implements EvictionPolicy<IN> {
+
+	/**
+	 * Auto generated version id
+	 */
+	private static final long serialVersionUID = 2319201348806427996L;
+
+	int maxElements;
+	int counter;
+	int deleteOnEviction = 1;
+
+	/**
+	 * This constructor allows the setup of the simplest possible count based
+	 * eviction. It keeps the size of the buffer according to the given
+	 * maxElements parameter by deleting the oldest element in the buffer.
+	 * Eviction only takes place if the counter of arriving elements would be
+	 * higher than maxElements without eviction.
+	 * 
+	 * @param maxElements
+	 *            The maximum number of elements before eviction. As soon as one
+	 *            more element arrives, the oldest element will be deleted
+	 */
+	public CountEvictionPolicy(int maxElements) {
+		this(maxElements, 1);
+	}
+
+	/**
+	 * This constructor allows to set up both, the maximum number of elements
+	 * and the number of elements to be deleted in case of an eviction.
+	 * 
+	 * Eviction only takes place if the counter of arriving elements would be
+	 * higher than maxElements without eviction. In such a case deleteOnEviction
+	 * elements will be removed from the buffer.
+	 * 
+	 * The counter of arriving elements is adjusted respectively, but never set
+	 * below zero:
+	 * counter=(counter-deleteOnEviction<0)?0:counter-deleteOnEviction
+	 * 
+	 * @param maxElements
+	 *            maxElements The maximum number of elements before eviction.
+	 * @param deleteOnEviction
+	 *            The number of elements to be deleted on eviction. The counter
+	 *            will be adjusted respectively but never below zero.
+	 */
+	public CountEvictionPolicy(int maxElements, int deleteOnEviction) {
+		this(maxElements, deleteOnEviction, 0);
+	}
+
+	/**
+	 * The same as {@link CountEvictionPolicy#CountEvictionPolicy(int, int)}.
+	 * Additionally a custom start value for the counter of arriving elements
+	 * can be set. By setting a negative start value the first eviction can be
+	 * delayed.
+	 * 
+	 * @param maxElements
+	 *            maxElements The maximum number of elements before eviction.
+	 * @param deleteOnEviction
+	 *            The number of elements to be deleted on eviction. The counter
+	 *            will be adjusted respectively but never below zero.
+	 * @param startValue
+	 *            A custom start value for the counter of arriving elements.
+	 * @see CountEvictionPolicy#NextGenCountEvictionPolicy(int, int)
+	 */
+	public CountEvictionPolicy(int maxElements, int deleteOnEviction, int startValue) {
+		this.counter = startValue;
+		this.deleteOnEviction = deleteOnEviction;
+		this.maxElements = maxElements;
+	}
+
+	@Override
+	public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
+		// The comparison have to be >= and not == to cover case max=0
+		if (counter >= maxElements) {
+			// Adjust the counter according to the current eviction
+			counter = (counter - deleteOnEviction < 0) ? 0 : counter - deleteOnEviction;
+			// The current element will be added after the eviction
+			// Therefore, increase counter in any case
+			counter++;
+			return deleteOnEviction;
+		} else {
+			counter++;
+			return 0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d731b3b0/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
new file mode 100644
index 0000000..ca0058e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+/**
+ * This policy triggers at every n'th element.
+ * 
+ * @param <IN>
+ *            The type of the data points which are handled by this policy
+ */
+public class CountTriggerPolicy<IN> implements TriggerPolicy<IN> {
+
+	/**
+	 * Auto generated version ID
+	 */
+	private static final long serialVersionUID = -6357200688886103968L;
+
+	private static final int DEFAULT_START_VALUE = 0;
+
+	private int counter;
+	private int max;
+
+	/**
+	 * This constructor will set up a count based trigger, which triggers after
+	 * max elements have arrived.
+	 * 
+	 * @param max
+	 *            The number of arriving elements before the trigger occurs.
+	 */
+	public CountTriggerPolicy(int max) {
+		this(max, DEFAULT_START_VALUE);
+	}
+
+	/**
+	 * In addition to {@link CountTriggerPolicy#CountTriggerPolicy(int)} this
+	 * constructor allows to set a custom start value for the element counter.
+	 * This can be used to delay the first trigger by setting a negative start
+	 * value. Often the first trigger should be delayed in case of sliding
+	 * windows. For example if the size of a window should be 4 and a trigger
+	 * should happen every 2, a start value of -2 would allow to also have the
+	 * first window of size 4.
+	 * 
+	 * @param max
+	 *            The number of arriving elements before the trigger occurs.
+	 * @param startValue
+	 *            The start value for the counter of arriving elements.
+	 * @see CountTriggerPolicy#CountTriggerPolicy(int)
+	 */
+	public CountTriggerPolicy(int max, int startValue) {
+		this.max = max;
+		this.counter = startValue;
+	}
+
+	@Override
+	public boolean notifyTrigger(IN datapoint) {
+		// The comparison have to be >= and not == to cover case max=0
+		if (counter >= max) {
+			// The current data point will be part of the next window!
+			// Therefore the counter needs to be set to one already.
+			counter = 1;
+			return true;
+		} else {
+			counter++;
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d731b3b0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
new file mode 100644
index 0000000..4773f4e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicyTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class CountEvictionPolicyTest {
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void testCountEvictionPolicy() {
+		List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+		int counter;
+
+		// The count policy should not care about the triggered parameter
+		// Therefore its value switches after each use in this test.
+		boolean triggered = false;
+		// the size of the buffer should not matter as well!
+
+		// Test count of different sizes (0..9)
+		for (int i = 0; i < 10; i++) {
+			EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, i);
+			counter = 0;
+
+			// Test first i steps (should not evict)
+			for (int j = 0; j < i; j++) {
+				counter++;
+				assertEquals("Evictionpolicy with count of " + i + " evicted tuples at add nr. "
+						+ counter + ". It should not evict for the first " + i + " adds.", 0,
+						evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
+								tuples.get(Math.abs((i - j)) % 10)));
+			}
+
+			// Test the next three evictions
+			for (int j = 0; j < 3; j++) {
+				// The first add should evict now
+				counter++;
+				assertEquals("Evictionpolicy with count of " + i
+						+ " did not evict correct number of tuples at the expected pos " + counter
+						+ ".", i, evictionPolicy.notifyEviction(tuples.get(j),
+						(triggered = !triggered), tuples.get(Math.abs((i - j)) % 10)));
+
+				// the next i-1 adds should not evict
+				for (int k = 0; k < i - 1; k++) {
+					counter++;
+					assertEquals("Evictionpolicy with count of " + i
+							+ " evicted tuples at add nr. " + counter, 0,
+							evictionPolicy.notifyEviction(tuples.get(j), (triggered = !triggered),
+									tuples.get(Math.abs((i - j)) % 10)));
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testCountEvictionPolicyStartValuesAndEvictionAmount() {
+
+		// The count policy should not care about the triggered parameter
+		// Therefore its value switches after each use in this test.
+		boolean triggered = false;
+		// the size of the buffer should not matter as well!
+
+		List<Integer> tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+		// Text different eviction amounts (0..3)
+		for (int x = 0; x < 4; x++) {
+
+			// Test count of different sizes (0..9)
+			for (int i = 0; i < 10; i++) {
+
+				int counter = 0;
+
+				// Test different start values (-5..5)
+				for (int j = -5; i < 6; i++) {
+					EvictionPolicy evictionPolicy = new CountEvictionPolicy(i, x, j);
+					// Add tuples without eviction
+					for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) {
+						counter++;
+						assertEquals("Evictionpolicy with count of " + i
+								+ " did not evict correct number of tuples at the expected pos "
+								+ counter + ".", 0, evictionPolicy.notifyEviction(
+								tuples.get(Math.abs(j)), (triggered = !triggered),
+								tuples.get(Math.abs((i - j)) % 10)));
+					}
+					// Expect eviction
+					counter++;
+					assertEquals("Evictionpolicy with count of " + i
+							+ " did not evict correct number of tuples at the expected pos "
+							+ counter + ".", x, evictionPolicy.notifyEviction(
+							tuples.get(Math.abs(j)), (triggered = !triggered),
+							tuples.get(Math.abs((i - j)) % 10)));
+				}
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d731b3b0/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
new file mode 100644
index 0000000..b7120e7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.junit.Test;
+
+public class CountTriggerPolicyTest {
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testCountTriggerPolicy() {
+
+		List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+		int counter;
+
+		// Test count of different sizes (0..9)
+		for (int i = 0; i < 10; i++) {
+			TriggerPolicy triggerPolicy = Count.of(i).toTrigger();
+			counter=0;
+			
+			// Test first i steps (should not trigger)
+			for (int j = 0; j < i; j++) {
+				counter++;
+				assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. " + counter
+						+ ". It should not trigger for the first " + i + " adds.",
+						triggerPolicy.notifyTrigger(tuples.get(j)));
+			}
+
+			// Test the next three triggers
+			for (int j = 0; j < 3; j++) {
+				// The first add should trigger now
+				counter++;
+				assertTrue("Triggerpolicy with count of " + i
+						+ " did not trigger at the expected pos " + counter + ".",
+						triggerPolicy.notifyTrigger(tuples.get(j)));
+
+				// the next i-1 adds should not trigger
+				for (int k = 0; k < i - 1; k++) {
+					counter++;
+					assertFalse("Triggerpolicy with count of " + i + " triggered at add nr. "
+							+ counter, triggerPolicy.notifyTrigger(tuples.get(k)));
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void testCountTriggerPolicyStartValues() {
+
+		List tuples = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+		// Test count of different sizes (0..9)
+		for (int i = 0; i < 10; i++) {
+
+			// Test different start values (-5..5)
+			for (int j = -5; i < 6; i++) {
+				TriggerPolicy triggerPolicy = new CountTriggerPolicy(i, j);
+				// Add tuples without trigger
+				for (int k = 0; k < ((i - j > 0) ? i - j : 0); k++) {
+					assertFalse("Triggerpolicy with count of " + i + " and start value of " + j
+							+ " triggered at add nr. " + (k + 1),
+							triggerPolicy.notifyTrigger(tuples.get(k % 10)));
+				}
+				// Expect trigger
+				assertTrue("Triggerpolicy with count of " + i + "and start value of " + j
+						+ " did not trigger at the expected position.",
+						triggerPolicy.notifyTrigger(tuples.get(0)));
+			}
+		}
+	}
+}