You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:14 UTC
[09/34] incubator-flink git commit: [streaming] Created active
trigger and eviction policies based on time. Included respective policy
helper and test cases.
[streaming] Created active trigger and eviction policies based on time. Included respective policy helper and test cases.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6f3bf576
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6f3bf576
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6f3bf576
Branch: refs/heads/master
Commit: 6f3bf576c68e1884393016ef46297236367522eb
Parents: 40c3ae4
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Mon Oct 27 13:31:00 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100
----------------------------------------------------------------------
.../streaming/api/windowing/helper/Time.java | 125 ++++++++++++
.../windowing/policy/TimeEvictionPolicy.java | 109 +++++++++++
.../api/windowing/policy/TimeTriggerPolicy.java | 193 +++++++++++++++++++
.../policy/TimeEvictionPolicyTest.java | 125 ++++++++++++
.../windowing/policy/TimeTriggerPolicyTest.java | 137 +++++++++++++
5 files changed, 689 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
new file mode 100644
index 0000000..37f2902
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * This helper represents a time based count or eviction policy. By default the
+ * time is measured with {@link System#currentTimeMillis()} in
+ * {@link DefaultTimeStamp}.
+ *
+ * @param <DATA>
+ * The data type which is handled by the time stamp used in the
+ * policy represented by this helper
+ */
+public class Time<DATA> implements WindowingHelper<DATA> {
+
+ private int timeVal;
+ private TimeUnit granularity;
+ private Extractor<Long, DATA> timeToData;
+
+ /**
+ * Creates an helper representing a trigger which triggers every given
+ * timeVal or an eviction which evicts all elements older than timeVal.
+ *
+ * @param timeVal
+ * The number of time units
+ * @param granularity
+ * The unit of time such as minute oder millisecond. Note that
+ * the smallest possible granularity is milliseconds. Any smaller
+ * time unit might cause an error at runtime due to conversion
+ * problems.
+ * @param timeToData
+ * This policy creates fake elements to not miss windows in case
+ * no element arrived within the duration of the window. This
+ * extractor should wrap a long into such an element of type
+ * DATA.
+ */
+ public Time(int timeVal, TimeUnit granularity, Extractor<Long, DATA> timeToData) {
+ this.timeVal = timeVal;
+ this.granularity = granularity;
+ this.timeToData = timeToData;
+ }
+
+ /**
+ * Creates an helper representing a trigger which triggers every given
+ * timeVal or an eviction which evicts all elements older than timeVal.
+ *
+ * The default granularity for timeVal used in this method is seconds.
+ *
+ * @param timeVal
+ * The number of time units measured in seconds.
+ * @param timeToData
+ * This policy creates fake elements to not miss windows in case
+ * no element arrived within the duration of the window. This
+ * extractor should wrap a long into such an element of type
+ * DATA.
+ */
+ public Time(int timeVal, Extractor<Long, DATA> timeToData) {
+ this(timeVal, TimeUnit.SECONDS, timeToData);
+ }
+
+ @Override
+ public EvictionPolicy<DATA> toEvict() {
+ return new TimeEvictionPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>());
+ }
+
+ @Override
+ public TriggerPolicy<DATA> toTrigger() {
+ return new TimeTriggerPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>(),
+ timeToData);
+ }
+
+ /**
+ * Creates an helper representing a trigger which triggers every given
+ * timeVal or an eviction which evicts all elements older than timeVal.
+ *
+ * @param timeVal
+ * The number of time units
+ * @param granularity
+ * The unit of time such as minute oder millisecond. Note that
+ * the smallest possible granularity is milliseconds. Any smaller
+ * time unit might cause an error at runtime due to conversion
+ * problems.
+ * @param timeToData
+ * This policy creates fake elements to not miss windows in case
+ * no element arrived within the duration of the window. This
+ * extractor should wrap a long into such an element of type
+ * DATA.
+ * @return an helper representing a trigger which triggers every given
+ * timeVal or an eviction which evicts all elements older than
+ * timeVal.
+ */
+ public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity,
+ Extractor<Long, DATA> timeToData) {
+ return new Time<DATA>(timeVal, granularity, timeToData);
+ }
+
+ private long granularityInMillis() {
+ return this.granularity.toMillis(this.timeVal);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
new file mode 100644
index 0000000..a78cc4e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+/**
+ * This eviction policy evicts all elements which are older then a specified
+ * time. The time is measured using a given {@link TimeStamp} implementation. A
+ * point in time is always represented as long. Therefore, the granularity can
+ * be set as long value as well.
+ *
+ * @param <DATA>
+ * The type of the incoming data points which are processed by this
+ * policy.
+ */
+public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA> {
+
+ /**
+ * auto generated version id
+ */
+ private static final long serialVersionUID = -1457476766124518220L;
+
+ private long granularity;
+ private TimeStamp<DATA> timestamp;
+ private LinkedList<DATA> buffer = new LinkedList<DATA>();
+
+ /**
+ * This eviction policy evicts all elements which are older than a specified
+ * time. The time is measured using a given {@link TimeStamp}
+ * implementation. A point in time is always represented as long. Therefore,
+ * the granularity can be set as long value as well. If this value is set to
+ * 2 the policy will evict all elements which are older as 2.
+ *
+ * <code>
+ * while (time(firstInBuffer)<current time-granularity){
+ * evict firstInBuffer;
+ * }
+ * </code>
+ *
+ * @param granularity
+ * The granularity of the eviction. If this value is set to 2 the
+ * policy will evict all elements which are older as 2(if
+ * (time(X)<current time-granularity) evict X).
+ * @param timestamp
+ * The {@link TimeStamp} to measure the time with. This can be
+ * either user defined of provided by the API.
+ */
+ public TimeEvictionPolicy(long granularity, TimeStamp<DATA> timestamp) {
+ this.timestamp = timestamp;
+ this.granularity = granularity;
+ }
+
+ @Override
+ public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
+ return notifyEviction(datapoint, triggered, bufferSize, false);
+ }
+
+ @Override
+ public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize) {
+ return notifyEviction(datapoint, true, bufferSize, true);
+ }
+
+ private int notifyEviction(DATA datapoint, boolean triggered, int bufferSize, boolean isFake) {
+ // check for deleted tuples (deletes by other policies)
+ while (bufferSize < this.buffer.size()) {
+ this.buffer.removeFirst();
+ }
+
+ // delete and count expired tuples
+ int counter = 0;
+ while (!buffer.isEmpty()) {
+
+ if (timestamp.getTimestamp(buffer.getFirst()) < timestamp.getTimestamp(datapoint)
+ - granularity) {
+ buffer.removeFirst();
+ counter++;
+ } else {
+ break;
+ }
+ }
+
+ if (!isFake) {
+ // Add current element to buffer
+ buffer.add(datapoint);
+ }
+
+ // return result
+ return counter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
new file mode 100644
index 0000000..d50e285
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This trigger policy triggers with regard to the time. The is measured using a
+ * given {@link TimeStamp} implementation. A point in time is always represented
+ * as long. Therefore, parameters such as granularity and delay can be set as
+ * long value as well.
+ *
+ * @param <DATA>
+ * The type of the incoming data points which are processed by this
+ * policy.
+ */
+public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA> {
+
+ /**
+ * auto generated version id
+ */
+ private static final long serialVersionUID = -5122753802440196719L;
+
+ private long startTime;
+ private long granularity;
+ private TimeStamp<DATA> timestamp;
+ private Extractor<Long, DATA> longToDATAExtractor;
+
+ /**
+ * This trigger policy triggers with regard to the time. The is measured
+ * using a given {@link TimeStamp} implementation. A point in time is always
+ * represented as long. Therefore, parameters such as granularity can be set
+ * as long value as well. If this value for the granularity is set to 2 for
+ * example, the policy will trigger at every second point in time.
+ *
+ * @param granularity
+ * The granularity of the trigger. If this value is set to 2 the
+ * policy will trigger at every second time point
+ * @param timestamp
+ * The {@link TimeStamp} to measure the time with. This can be
+ * either user defined of provided by the API.
+ * @param timeWrapper
+ * This policy creates fake elements to not miss windows in case
+ * no element arrived within the duration of the window. This
+ * extractor should wrap a long into such an element of type
+ * DATA.
+ */
+ public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp,
+ Extractor<Long, DATA> timeWrapper) {
+ this(granularity, timestamp, 0, timeWrapper);
+ }
+
+ /**
+ * This is mostly the same as
+ * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
+ * to granularity and timestamp a delay can be specified for the first
+ * trigger. If the start time given by the timestamp is x, the delay is y,
+ * and the granularity is z, the first trigger will happen at x+y+z.
+ *
+ * @param granularity
+ * The granularity of the trigger. If this value is set to 2 the
+ * policy will trigger at every second time point
+ * @param timestamp
+ * The {@link TimeStamp} to measure the time with. This can be
+ * either user defined of provided by the API.
+ * @param delay
+ * A delay for the first trigger. If the start time given by the
+ * timestamp is x, the delay is y, and the granularity is z, the
+ * first trigger will happen at x+y+z.
+ * @param timeWrapper
+ * This policy creates fake elements to not miss windows in case
+ * no element arrived within the duration of the window. This
+ * extractor should wrap a long into such an element of type
+ * DATA.
+ */
+ public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay,
+ Extractor<Long, DATA> timeWrapper) {
+ this.startTime = timestamp.getStartTime() + delay;
+ this.timestamp = timestamp;
+ this.granularity = granularity;
+ this.longToDATAExtractor = timeWrapper;
+ }
+
+ @Override
+ public synchronized boolean notifyTrigger(DATA datapoint) {
+ long recordTime = timestamp.getTimestamp(datapoint);
+ // start time is included, but end time is excluded: >=
+ if (recordTime >= startTime + granularity) {
+ if (granularity != 0) {
+ startTime = recordTime - ((recordTime - startTime) % granularity);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * This method checks if we missed a window end. If this is the
+ * case we trigger the missed windows using fake elements.
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public synchronized DATA[] preNotifyTrigger(DATA datapoint) {
+ LinkedList<DATA> fakeElements = new LinkedList<DATA>();
+ // check if there is more then one window border missed
+ // use > here. In case >= would fit, the regular call will do the job.
+ while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
+ fakeElements.add(longToDATAExtractor.extract(startTime += granularity));
+ }
+ return (DATA[]) fakeElements.toArray();
+ }
+
+ /**
+ * In case {@link DefaultTimeStamp} is used, a runnable is returned which
+ * triggers based on the current system time. If any other time measure is
+ * used the method return null.
+ *
+ * @param callback
+ * The object which is takes the callbacks for adding fake
+ * elements out of the runnable.
+ * @return A runnable is returned which triggers based on the current system
+ * time. If any other time measure is used the method return null.
+ */
+ @Override
+ public Runnable createActiveTriggerRunnable(ActiveTriggerCallback<DATA> callback) {
+ if (this.timestamp instanceof DefaultTimeStamp) {
+ return new TimeCheck(callback);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * This method is only called in case the runnable triggers a window end
+ * according to the {@link DefaultTimeStamp}.
+ *
+ * @param callback
+ * The callback object.
+ */
+ private synchronized void activeFakeElementEmission(ActiveTriggerCallback<DATA> callback) {
+
+ if (System.currentTimeMillis() >= startTime + granularity) {
+ startTime += granularity;
+ callback.sendFakeElement(longToDATAExtractor.extract(startTime += granularity));
+ }
+
+ }
+
+ private class TimeCheck implements Runnable {
+ ActiveTriggerCallback<DATA> callback;
+
+ public TimeCheck(ActiveTriggerCallback<DATA> callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ // wait for the specified granularity
+ try {
+ Thread.sleep(granularity);
+ } catch (InterruptedException e) {
+ // ignore it...
+ }
+ // Trigger using the respective methods. Methods are
+ // synchronized to prevent race conditions between real and fake
+ // elements at the policy.
+ activeFakeElementEmission(callback);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
new file mode 100644
index 0000000..303c66a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TimeEvictionPolicyTest {
+
+ @Test
+ public void timeEvictionTest() {
+ // create some test data
+ Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30, 31, 33, 36, 40, 41, 42, 43, 44,
+ 45, 47, 55 };
+ Integer[] numToDelete = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 0, 0, 0, 3 };
+
+ // create a timestamp
+ @SuppressWarnings("serial")
+ TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ };
+
+ // test different granularity
+ for (long granularity = 0; granularity < 40; granularity++) {
+ // create policy
+ TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity,
+ timeStamp);
+
+ // The trigger status should not effect the policy. Therefore, it's
+ // value is changed after each usage.
+ boolean triggered = false;
+
+ // The eviction should work similar with both, fake and real
+ // elements. Which kind is used is changed on every 3rd element in
+ // this test.
+ int fakeAndRealCounter = 0;
+ boolean fake = false;
+
+ // test by adding values
+ LinkedList<Integer> buffer = new LinkedList<Integer>();
+ for (int i = 0; i < times.length; i++) {
+
+ // check if the current element should be a fake
+ fakeAndRealCounter++;
+ if (fakeAndRealCounter > 2) {
+ fake = !fake;
+ fakeAndRealCounter = 0;
+ }
+
+ int result;
+
+ if (fake) {
+ // Notify eviction with fake element
+ result = policy.notifyEvictionWithFakeElement(times[i], buffer.size());
+ } else {
+ // Notify eviction with real element
+ result = policy.notifyEviction(times[i], (triggered = !triggered),
+ buffer.size());
+ }
+
+ // handle correctness of eviction
+ for (; result > 0 && !buffer.isEmpty(); result--) {
+ if (buffer.getFirst() < times[i] - granularity) {
+ buffer.removeFirst();
+ } else {
+ fail("The policy wanted to evict time " + buffer.getFirst()
+ + " while the current time was " + times[i]
+ + "and the granularity was " + granularity);
+ }
+ }
+
+ // test that all required evictions have been done
+ if (!buffer.isEmpty()) {
+ assertTrue("The policy did not evict " + buffer.getFirst()
+ + " while the current time was " + times[i]
+ + " and the granularity was " + granularity,
+ (buffer.getFirst() >= times[i] - granularity));
+ }
+
+ // test influence of other evictions
+ for (int j = numToDelete[i % numToDelete.length]; j > 0; j--) {
+ if (!buffer.isEmpty()) {
+ buffer.removeFirst();
+ }
+ }
+
+ // add current element to buffer if it is no fake
+ if (!fake) {
+ buffer.add(times[i]);
+ }
+
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
new file mode 100644
index 0000000..839564d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TimeTriggerPolicyTest {
+
+ @Test
+ public void timeTriggerRegularNotifyTest() {
+ // create some test data
+ Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30 };
+
+ // create a timestamp
+ @SuppressWarnings("serial")
+ TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ };
+
+ // test different granularity
+ for (long granularity = 0; granularity < 31; granularity++) {
+ // create policy
+ TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp,
+ new Extractor<Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer extract(Long in) {
+ return in.intValue();
+ }
+ });
+
+ // remember window border
+ // Remark: This might NOT work in case the timeStamp uses
+ // System.getCurrentTimeMillis to determine the start time.
+ long currentTime = timeStamp.getStartTime();
+
+ // test by adding values
+ for (int i = 0; i < times.length; i++) {
+ boolean result = policy.notifyTrigger(times[i]);
+ // start time is included, but end time is excluded: >=
+ if (times[i] >= currentTime + granularity) {
+ if (granularity != 0) {
+ currentTime = times[i] - ((times[i] - currentTime) % granularity);
+ }
+ assertTrue("The policy did not trigger at pos " + i + " (current time border: "
+ + currentTime + "; current granularity: " + granularity
+ + "; data point time: " + times[i] + ")", result);
+ } else {
+ assertFalse("The policy triggered wrong at pos " + i
+ + " (current time border: " + currentTime + "; current granularity: "
+ + granularity + "; data point time: " + times[i] + ")", result);
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void timeTriggerPreNotifyTest() {
+ // create some test data
+ Integer[] times = { 1, 3, 20, 26};
+
+ // create a timestamp
+ @SuppressWarnings("serial")
+ TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ };
+
+ // create policy
+ TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp,
+ new Extractor<Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer extract(Long in) {
+ return in.intValue();
+ }
+ });
+
+ //expected result
+ Integer[][] result={{},{},{5,10,15},{25}};
+
+ //call policy
+ for (int i=0;i<times.length;i++){
+ arrayEqualityCheck(result[i],policy.preNotifyTrigger(times[i]));
+ policy.notifyTrigger(times[i]);
+ }
+ }
+
+ private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+ assertEquals("The result arrays must have the same length", array1.length, array2.length);
+ for (int i = 0; i < array1.length; i++) {
+ assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+ }
+ }
+
+}