You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:12 UTC

[07/34] incubator-flink git commit: [streaming] Created trigger and eviction policies based on data punctuation. (incl. test cases)

[streaming] Created trigger and eviction policies based on data punctuation. (incl. test cases)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a6cc8afb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a6cc8afb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a6cc8afb

Branch: refs/heads/master
Commit: a6cc8afb89f61d6067f4ccf85b50a5899630346a
Parents: 05c1519
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Mon Oct 27 13:44:02 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100

----------------------------------------------------------------------
 .../api/windowing/policy/PunctuationPolicy.java | 110 +++++++++++++++++
 .../windowing/policy/PunctuationPolicyTest.java | 123 +++++++++++++++++++
 2 files changed, 233 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6cc8afb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
new file mode 100644
index 0000000..0ec9a57
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This policy can be used to trigger and evict based on a punctuation which is
+ * present within the arriving data. Using this policy, one can react on an
+ * externally defined arbitrary windowing semantic.
+ * 
+ * In case this policy is used for eviction, the complete buffer will get
+ * deleted in case the punctuation is detected.
+ * 
+ * By default this policy does not react on fake elements. Wrap it in an
+ * {@link ActiveEvictionPolicyWrapper} to make it react on punctuation even in fake elements.
+ * 
+ * @param <IN>
+ *            The type of the input data handled by this policy. An
+ *            {@link Extractor} can be used to extract DATA for IN.
+ * @param <DATA>
+ *            The type of the punctuation. An {@link Extractor} can be used to
+ *            extract DATA for IN.
+ */
+public class PunctuationPolicy<IN, DATA> implements TriggerPolicy<IN>, EvictionPolicy<IN> {
+
+	/**
+	 * auto generated version id
+	 */
+	private static final long serialVersionUID = -8845130188912602498L;
+	private int counter = 0;
+	private Extractor<IN, DATA> extractor;
+	private DATA punctuation;
+
+	/**
+	 * Creates the punctuation policy without using any extractor. To make this
+	 * work IN and DATA must not be different types.
+	 * 
+	 * @param punctuation
+	 *            the punctuation which leads to trigger/evict.
+	 */
+	public PunctuationPolicy(DATA punctuation) {
+		this(punctuation, null);
+	}
+
+	/**
+	 * Creates the punctuation policy which uses the specified extractor to
+	 * isolate the punctuation from the data.
+	 * 
+	 * @param punctuation
+	 *            the punctuation which leads to trigger/evict.
+	 * @param extractor
+	 *            An {@link Extractor} which converts IN to DATA.
+	 */
+	public PunctuationPolicy(DATA punctuation, Extractor<IN, DATA> extractor) {
+		this.punctuation = punctuation;
+		this.extractor = extractor;
+	}
+
+	@Override
+	public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
+		if (notifyTrigger(datapoint)) {
+			int tmp = counter;
+			// As the current will be add after the eviction the counter needs
+			// to be set to one already
+			counter = 1;
+			return tmp;
+		} else {
+			counter++;
+			return 0;
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public boolean notifyTrigger(IN datapoint) {
+		DATA tmp;
+
+		// eventually extract data
+		if (extractor == null) {
+			// unchecked convert (cannot check it here)
+			tmp = (DATA) datapoint;
+		} else {
+			tmp = extractor.extract(datapoint);
+		}
+
+		// compare data with punctuation
+		if (punctuation.equals(tmp)) {
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a6cc8afb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
new file mode 100644
index 0000000..8224b12
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicyTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.extractor.FieldFromTuple;
+import org.junit.Test;
+
+public class PunctuationPolicyTest {
+
+	// This value should not effect the policy. It is changed at each call to
+	// verify this.
+	private boolean triggered = false;
+
+	@Test
+	public void PunctuationTriggerTestWithoutExtraction() {
+		PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>(
+				new TestObject(0));
+		assertTrue("The present punctuation was not detected. (POS 1)",
+				policy.notifyTrigger(new TestObject(0)));
+		assertFalse("There was a punctuation detected which wasn't present. (POS 2)",
+				policy.notifyTrigger(new TestObject(1)));
+	}
+
+	@Test
+	public void PunctuationTriggerTestWithExtraction() {
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>(
+				new TestObject(0), new FieldFromTuple(0));
+		assertTrue("The present punctuation was not detected. (POS 3)",
+				policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(0),
+						new TestObject(1))));
+		assertFalse("There was a punctuation detected which wasn't present. (POS 4)",
+				policy.notifyTrigger(new Tuple2<Object, Object>(new TestObject(1),
+						new TestObject(0))));
+	}
+
+	@Test
+	public void PunctuationEvictionTestWithoutExtraction() {
+		// The current buffer size should not effect the test. It's therefore
+		// always 0 here.
+
+		PunctuationPolicy<Object, Object> policy = new PunctuationPolicy<Object, Object>(
+				new TestObject(0));
+		assertEquals(
+				"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 5)",
+				0, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0));
+		for (int i = 0; i < 10; i++) {
+			for (int j = 0; j < i; j++) {
+				assertEquals("There was a punctuation detected which wasn't present. (POS 6)", 0,
+						policy.notifyEviction(new TestObject(1), (triggered = !triggered), 0));
+			}
+			assertEquals(
+					"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 7)",
+					i + 1, policy.notifyEviction(new TestObject(0), (triggered = !triggered), 0));
+		}
+	}
+
+	@Test
+	public void PunctuationEvictionTestWithExtraction() {
+		// The current buffer size should not effect the test. It's therefore
+		// always 0 here.
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		PunctuationPolicy<Tuple2<Object, Object>, Object> policy = new PunctuationPolicy<Tuple2<Object, Object>, Object>(
+				new TestObject(0), new FieldFromTuple(0));
+		assertEquals(
+				"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)",
+				0, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0),
+						new TestObject(1)), (triggered = !triggered), 0));
+		for (int i = 0; i < 10; i++) {
+			for (int j = 0; j < i; j++) {
+				assertEquals("There was a punctuation detected which wasn't present. (POS 9)", 0,
+						policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(1),
+								new TestObject(0)), (triggered = !triggered), 0));
+			}
+			assertEquals(
+					"The present punctuation was not detected or the number of deleted tuples was wrong. (POS 10)",
+					i + 1, policy.notifyEviction(new Tuple2<Object, Object>(new TestObject(0),
+							new TestObject(1)), (triggered = !triggered), 0));
+		}
+	}
+
+	private class TestObject {
+
+		private int id;
+
+		public TestObject(int id) {
+			this.id = id;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (o instanceof TestObject && ((TestObject) o).getId() == this.id) {
+				return true;
+			} else {
+				return false;
+			}
+		}
+
+		public int getId() {
+			return id;
+		}
+	}
+
+}