You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:25 UTC

[20/34] incubator-flink git commit: [streaming] Added test cases for the policy based windowing invokable.

[streaming] Added test cases for the policy based windowing invokable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3f506835
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3f506835
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3f506835

Branch: refs/heads/master
Commit: 3f506835b6d2fbe13a91e74d34de82b584cddcd1
Parents: 542c66f
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Tue Oct 28 10:12:36 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100

----------------------------------------------------------------------
 .../operator/WindowingInvokableTest.java        | 272 +++++++++++++++++++
 1 file changed, 272 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3f506835/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
new file mode 100644
index 0000000..79df1c5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.junit.Test;
+
+public class WindowingInvokableTest {
+
+	/**
+	 * Test case equal to {@link WindowReduceInvokableTest}
+	 */
+	@Test
+	public void testWindowingInvokableWithTimePolicy() {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		inputs.add(1);
+		inputs.add(2);
+		inputs.add(2);
+		inputs.add(3);
+		inputs.add(4);
+		inputs.add(5);
+		inputs.add(10);
+		inputs.add(11);
+		inputs.add(11);
+		// 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
+		// 12-12-5-10-32
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(12);
+		expected.add(12);
+		expected.add(5);
+		expected.add(10);
+		expected.add(32);
+
+		TimeStamp<Integer> myTimeStamp = new TimeStamp<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+
+			@Override
+			public long getStartTime() {
+				return 1;
+			}
+		};
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		// Trigger every 2 time units but delay the first trigger by 2 (First
+		// trigger after 4, then every 2)
+		triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L,
+				new Extractor<Long, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer extract(Long in) {
+						return in.intValue();
+					}
+				}));
+		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
+		// Always delete all elements older then 4
+		evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
+
+		WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
+				triggers, evictions);
+
+		ArrayList<Integer> result = new ArrayList<Integer>();
+		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t.f0);
+		}
+
+		assertEquals(expected, result);
+	}
+
+	/**
+	 * Test case equal to {@link BatchReduceTest}
+	 */
+	@Test
+	public void testWindowingInvokableWithCountPolicy() {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs.add(i);
+		}
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		/*
+		 * The following setup reassembles the batch size 3 and the slide size 2
+		 * of the BatchReduceInvokable.
+		 */
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		// Trigger on every 2nd element, but the first time after the 3rd
+		triggers.add(new CountTriggerPolicy<Integer>(2, -1));
+
+		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
+		// On every 2nd element, remove the oldest 2 elements, but the first
+		// time after the 3rd element
+		evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
+
+		WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
+				triggers, evictions);
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(6);
+		expected.add(12);
+		expected.add(18);
+		expected.add(24);
+		expected.add(19);
+		List<Integer> result = new ArrayList<Integer>();
+		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t.f0);
+		}
+		assertEquals(expected, result);
+
+		/*
+		 * Begin test part 2
+		 */
+
+		List<Integer> inputs2 = new ArrayList<Integer>();
+		inputs2.add(1);
+		inputs2.add(2);
+		inputs2.add(-5); // changed this value to make sure it is excluded from
+							// the result
+		inputs2.add(-3);
+		inputs2.add(-4);
+
+		myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				if (value1 <= value2) {
+					return value1;
+				} else {
+					return value2;
+				}
+			};
+		};
+
+		/*
+		 * The following setup reassembles the batch size 2 and the slide size 3
+		 * of the BatchReduceInvokable.
+		 */
+		triggers = new LinkedList<TriggerPolicy<Integer>>();
+		// Trigger after every 3rd element, but the first time after the 2nd
+		triggers.add(new CountTriggerPolicy<Integer>(3, 1));
+
+		evictions = new LinkedList<EvictionPolicy<Integer>>();
+		// On every 3rd element, remove the oldest 3 elements, but the first
+		// time after on the 5th element
+		evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1));
+
+		WindowingInvokable<Integer> invokable2 = new WindowingInvokable<Integer>(myReduceFunction,
+				triggers, evictions);
+
+		List<Integer> expected2 = new ArrayList<Integer>();
+		expected2.add(1);
+		expected2.add(-4);
+
+		result = new ArrayList<Integer>();
+		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable2, inputs2)) {
+			result.add(t.f0);
+		}
+
+		assertEquals(expected2, result);
+
+	}
+
+	@Test
+	public void testWindowingInvokableWithMultiplePolicies() {
+		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
+		triggers.add(new CountTriggerPolicy<Integer>(2));
+		triggers.add(new CountTriggerPolicy<Integer>(3));
+
+		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
+		evictions.add(new CountEvictionPolicy<Integer>(2,2));
+		evictions.add(new CountEvictionPolicy<Integer>(3,3));
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		for (Integer i = 1; i <= 10; i++) {
+			inputs.add(i);
+		}
+		/**
+		 * <code>
+		 * VAL: 1,2,3,4,5,6,7,8,9,10
+		 * TR1:   |   |   |   |   |
+		 * TR2:     |     |     |
+		 * EV1:   2   2   2   2   2
+		 * EV2:     3     3     3
+		 * </code>
+		 */
+
+		List<Integer> expected = new ArrayList<Integer>();
+		expected.add(3);
+		expected.add(3);
+		expected.add(4);
+		expected.add(11);
+		expected.add(15);
+		expected.add(9);
+		expected.add(10);
+
+		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer reduce(Integer value1, Integer value2) throws Exception {
+				return value1 + value2;
+			}
+		};
+
+		WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
+				triggers, evictions);
+
+		ArrayList<Integer> result = new ArrayList<Integer>();
+		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t.f0);
+		}
+
+		assertEquals(expected, result);
+	}
+
+}