You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:14 UTC

[09/34] incubator-flink git commit: [streaming] Created active trigger and eviction policies based on time. Included respective policy helper and test cases.

[streaming] Created active trigger and eviction policies based on time. Included respective policy helper and test cases.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6f3bf576
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6f3bf576
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6f3bf576

Branch: refs/heads/master
Commit: 6f3bf576c68e1884393016ef46297236367522eb
Parents: 40c3ae4
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Mon Oct 27 13:31:00 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100

----------------------------------------------------------------------
 .../streaming/api/windowing/helper/Time.java    | 125 ++++++++++++
 .../windowing/policy/TimeEvictionPolicy.java    | 109 +++++++++++
 .../api/windowing/policy/TimeTriggerPolicy.java | 193 +++++++++++++++++++
 .../policy/TimeEvictionPolicyTest.java          | 125 ++++++++++++
 .../windowing/policy/TimeTriggerPolicyTest.java | 137 +++++++++++++
 5 files changed, 689 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
new file mode 100644
index 0000000..37f2902
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * This helper represents a time based count or eviction policy. By default the
+ * time is measured with {@link System#currentTimeMillis()} in
+ * {@link DefaultTimeStamp}.
+ * 
+ * @param <DATA>
+ *            The data type which is handled by the time stamp used in the
+ *            policy represented by this helper
+ */
+public class Time<DATA> implements WindowingHelper<DATA> {
+
+	private int timeVal;
+	private TimeUnit granularity;
+	private Extractor<Long, DATA> timeToData;
+
+	/**
+	 * Creates an helper representing a trigger which triggers every given
+	 * timeVal or an eviction which evicts all elements older than timeVal.
+	 * 
+	 * @param timeVal
+	 *            The number of time units
+	 * @param granularity
+	 *            The unit of time such as minute oder millisecond. Note that
+	 *            the smallest possible granularity is milliseconds. Any smaller
+	 *            time unit might cause an error at runtime due to conversion
+	 *            problems.
+	 * @param timeToData
+	 *            This policy creates fake elements to not miss windows in case
+	 *            no element arrived within the duration of the window. This
+	 *            extractor should wrap a long into such an element of type
+	 *            DATA.
+	 */
+	public Time(int timeVal, TimeUnit granularity, Extractor<Long, DATA> timeToData) {
+		this.timeVal = timeVal;
+		this.granularity = granularity;
+		this.timeToData = timeToData;
+	}
+
+	/**
+	 * Creates an helper representing a trigger which triggers every given
+	 * timeVal or an eviction which evicts all elements older than timeVal.
+	 * 
+	 * The default granularity for timeVal used in this method is seconds.
+	 * 
+	 * @param timeVal
+	 *            The number of time units measured in seconds.
+	 * @param timeToData
+	 *            This policy creates fake elements to not miss windows in case
+	 *            no element arrived within the duration of the window. This
+	 *            extractor should wrap a long into such an element of type
+	 *            DATA.
+	 */
+	public Time(int timeVal, Extractor<Long, DATA> timeToData) {
+		this(timeVal, TimeUnit.SECONDS, timeToData);
+	}
+
+	@Override
+	public EvictionPolicy<DATA> toEvict() {
+		return new TimeEvictionPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>());
+	}
+
+	@Override
+	public TriggerPolicy<DATA> toTrigger() {
+		return new TimeTriggerPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>(),
+				timeToData);
+	}
+
+	/**
+	 * Creates an helper representing a trigger which triggers every given
+	 * timeVal or an eviction which evicts all elements older than timeVal.
+	 * 
+	 * @param timeVal
+	 *            The number of time units
+	 * @param granularity
+	 *            The unit of time such as minute oder millisecond. Note that
+	 *            the smallest possible granularity is milliseconds. Any smaller
+	 *            time unit might cause an error at runtime due to conversion
+	 *            problems.
+	 * @param timeToData
+	 *            This policy creates fake elements to not miss windows in case
+	 *            no element arrived within the duration of the window. This
+	 *            extractor should wrap a long into such an element of type
+	 *            DATA.
+	 * @return an helper representing a trigger which triggers every given
+	 *         timeVal or an eviction which evicts all elements older than
+	 *         timeVal.
+	 */
+	public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity,
+			Extractor<Long, DATA> timeToData) {
+		return new Time<DATA>(timeVal, granularity, timeToData);
+	}
+
+	private long granularityInMillis() {
+		return this.granularity.toMillis(this.timeVal);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
new file mode 100644
index 0000000..a78cc4e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+/**
+ * This eviction policy evicts all elements which are older then a specified
+ * time. The time is measured using a given {@link TimeStamp} implementation. A
+ * point in time is always represented as long. Therefore, the granularity can
+ * be set as long value as well.
+ *
+ * @param <DATA>
+ *            The type of the incoming data points which are processed by this
+ *            policy.
+ */
+public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA> {
+
+	/**
+	 * auto generated version id
+	 */
+	private static final long serialVersionUID = -1457476766124518220L;
+
+	private long granularity;
+	private TimeStamp<DATA> timestamp;
+	private LinkedList<DATA> buffer = new LinkedList<DATA>();
+
+	/**
+	 * This eviction policy evicts all elements which are older than a specified
+	 * time. The time is measured using a given {@link TimeStamp}
+	 * implementation. A point in time is always represented as long. Therefore,
+	 * the granularity can be set as long value as well. If this value is set to
+	 * 2 the policy will evict all elements which are older as 2.
+	 * 
+	 * <code>
+	 *   while (time(firstInBuffer)<current time-granularity){
+	 *   	evict firstInBuffer;
+	 *   }
+	 * </code>
+	 * 
+	 * @param granularity
+	 *            The granularity of the eviction. If this value is set to 2 the
+	 *            policy will evict all elements which are older as 2(if
+	 *            (time(X)<current time-granularity) evict X).
+	 * @param timestamp
+	 *            The {@link TimeStamp} to measure the time with. This can be
+	 *            either user defined of provided by the API.
+	 */
+	public TimeEvictionPolicy(long granularity, TimeStamp<DATA> timestamp) {
+		this.timestamp = timestamp;
+		this.granularity = granularity;
+	}
+
+	@Override
+	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
+		return notifyEviction(datapoint, triggered, bufferSize, false);
+	}
+
+	@Override
+	public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize) {
+		return notifyEviction(datapoint, true, bufferSize, true);
+	}
+
+	private int notifyEviction(DATA datapoint, boolean triggered, int bufferSize, boolean isFake) {
+		// check for deleted tuples (deletes by other policies)
+		while (bufferSize < this.buffer.size()) {
+			this.buffer.removeFirst();
+		}
+
+		// delete and count expired tuples
+		int counter = 0;
+		while (!buffer.isEmpty()) {
+
+			if (timestamp.getTimestamp(buffer.getFirst()) < timestamp.getTimestamp(datapoint)
+					- granularity) {
+				buffer.removeFirst();
+				counter++;
+			} else {
+				break;
+			}
+		}
+
+		if (!isFake) {
+			// Add current element to buffer
+			buffer.add(datapoint);
+		}
+
+		// return result
+		return counter;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
new file mode 100644
index 0000000..d50e285
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This trigger policy triggers with regard to the time. The is measured using a
+ * given {@link TimeStamp} implementation. A point in time is always represented
+ * as long. Therefore, parameters such as granularity and delay can be set as
+ * long value as well.
+ * 
+ * @param <DATA>
+ *            The type of the incoming data points which are processed by this
+ *            policy.
+ */
+public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA> {
+
+	/**
+	 * auto generated version id
+	 */
+	private static final long serialVersionUID = -5122753802440196719L;
+
+	private long startTime;
+	private long granularity;
+	private TimeStamp<DATA> timestamp;
+	private Extractor<Long, DATA> longToDATAExtractor;
+
+	/**
+	 * This trigger policy triggers with regard to the time. The is measured
+	 * using a given {@link TimeStamp} implementation. A point in time is always
+	 * represented as long. Therefore, parameters such as granularity can be set
+	 * as long value as well. If this value for the granularity is set to 2 for
+	 * example, the policy will trigger at every second point in time.
+	 * 
+	 * @param granularity
+	 *            The granularity of the trigger. If this value is set to 2 the
+	 *            policy will trigger at every second time point
+	 * @param timestamp
+	 *            The {@link TimeStamp} to measure the time with. This can be
+	 *            either user defined of provided by the API.
+	 * @param timeWrapper
+	 *            This policy creates fake elements to not miss windows in case
+	 *            no element arrived within the duration of the window. This
+	 *            extractor should wrap a long into such an element of type
+	 *            DATA.
+	 */
+	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp,
+			Extractor<Long, DATA> timeWrapper) {
+		this(granularity, timestamp, 0, timeWrapper);
+	}
+
+	/**
+	 * This is mostly the same as
+	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
+	 * to granularity and timestamp a delay can be specified for the first
+	 * trigger. If the start time given by the timestamp is x, the delay is y,
+	 * and the granularity is z, the first trigger will happen at x+y+z.
+	 * 
+	 * @param granularity
+	 *            The granularity of the trigger. If this value is set to 2 the
+	 *            policy will trigger at every second time point
+	 * @param timestamp
+	 *            The {@link TimeStamp} to measure the time with. This can be
+	 *            either user defined of provided by the API.
+	 * @param delay
+	 *            A delay for the first trigger. If the start time given by the
+	 *            timestamp is x, the delay is y, and the granularity is z, the
+	 *            first trigger will happen at x+y+z.
+	 * @param timeWrapper
+	 *            This policy creates fake elements to not miss windows in case
+	 *            no element arrived within the duration of the window. This
+	 *            extractor should wrap a long into such an element of type
+	 *            DATA.
+	 */
+	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay,
+			Extractor<Long, DATA> timeWrapper) {
+		this.startTime = timestamp.getStartTime() + delay;
+		this.timestamp = timestamp;
+		this.granularity = granularity;
+		this.longToDATAExtractor = timeWrapper;
+	}
+
+	@Override
+	public synchronized boolean notifyTrigger(DATA datapoint) {
+		long recordTime = timestamp.getTimestamp(datapoint);
+		// start time is included, but end time is excluded: >=
+		if (recordTime >= startTime + granularity) {
+			if (granularity != 0) {
+				startTime = recordTime - ((recordTime - startTime) % granularity);
+			}
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	/**
+	 * This method checks if we missed a window end. If this is the
+	 * case we trigger the missed windows using fake elements.
+	 */
+	@SuppressWarnings("unchecked")
+	@Override
+	public synchronized DATA[] preNotifyTrigger(DATA datapoint) {
+		LinkedList<DATA> fakeElements = new LinkedList<DATA>();
+		// check if there is more then one window border missed
+		// use > here. In case >= would fit, the regular call will do the job.
+		while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
+			fakeElements.add(longToDATAExtractor.extract(startTime += granularity));
+		}
+		return (DATA[]) fakeElements.toArray();
+	}
+
+	/**
+	 * In case {@link DefaultTimeStamp} is used, a runnable is returned which
+	 * triggers based on the current system time. If any other time measure is
+	 * used the method return null.
+	 * 
+	 * @param callback
+	 *            The object which is takes the callbacks for adding fake
+	 *            elements out of the runnable.
+	 * @return A runnable is returned which triggers based on the current system
+	 *         time. If any other time measure is used the method return null.
+	 */
+	@Override
+	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback<DATA> callback) {
+		if (this.timestamp instanceof DefaultTimeStamp) {
+			return new TimeCheck(callback);
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * This method is only called in case the runnable triggers a window end
+	 * according to the {@link DefaultTimeStamp}.
+	 * 
+	 * @param callback
+	 *            The callback object.
+	 */
+	private synchronized void activeFakeElementEmission(ActiveTriggerCallback<DATA> callback) {
+
+		if (System.currentTimeMillis() >= startTime + granularity) {
+			startTime += granularity;
+			callback.sendFakeElement(longToDATAExtractor.extract(startTime += granularity));
+		}
+
+	}
+
+	private class TimeCheck implements Runnable {
+		ActiveTriggerCallback<DATA> callback;
+
+		public TimeCheck(ActiveTriggerCallback<DATA> callback) {
+			this.callback = callback;
+		}
+
+		@Override
+		public void run() {
+			while (true) {
+				// wait for the specified granularity
+				try {
+					Thread.sleep(granularity);
+				} catch (InterruptedException e) {
+					// ignore it...
+				}
+				// Trigger using the respective methods. Methods are
+				// synchronized to prevent race conditions between real and fake
+				// elements at the policy.
+				activeFakeElementEmission(callback);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
new file mode 100644
index 0000000..303c66a
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import java.util.LinkedList;
+
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TimeEvictionPolicyTest {
+
+	@Test
+	public void timeEvictionTest() {
+		// create some test data
+		Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30, 31, 33, 36, 40, 41, 42, 43, 44,
+				45, 47, 55 };
+		Integer[] numToDelete = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 0, 0, 0, 0, 3 };
+
+		// create a timestamp
+		@SuppressWarnings("serial")
+		TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+
+			@Override
+			public long getStartTime() {
+				return 0;
+			}
+
+		};
+
+		// test different granularity
+		for (long granularity = 0; granularity < 40; granularity++) {
+			// create policy
+			TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity,
+					timeStamp);
+
+			// The trigger status should not effect the policy. Therefore, it's
+			// value is changed after each usage.
+			boolean triggered = false;
+
+			// The eviction should work similar with both, fake and real
+			// elements. Which kind is used is changed on every 3rd element in
+			// this test.
+			int fakeAndRealCounter = 0;
+			boolean fake = false;
+
+			// test by adding values
+			LinkedList<Integer> buffer = new LinkedList<Integer>();
+			for (int i = 0; i < times.length; i++) {
+
+				// check if the current element should be a fake
+				fakeAndRealCounter++;
+				if (fakeAndRealCounter > 2) {
+					fake = !fake;
+					fakeAndRealCounter = 0;
+				}
+
+				int result;
+
+				if (fake) {
+					// Notify eviction with fake element
+					result = policy.notifyEvictionWithFakeElement(times[i], buffer.size());
+				} else {
+					// Notify eviction with real element
+					result = policy.notifyEviction(times[i], (triggered = !triggered),
+							buffer.size());
+				}
+
+				// handle correctness of eviction
+				for (; result > 0 && !buffer.isEmpty(); result--) {
+					if (buffer.getFirst() < times[i] - granularity) {
+						buffer.removeFirst();
+					} else {
+						fail("The policy wanted to evict time " + buffer.getFirst()
+								+ " while the current time was " + times[i]
+								+ "and the granularity was " + granularity);
+					}
+				}
+
+				// test that all required evictions have been done
+				if (!buffer.isEmpty()) {
+					assertTrue("The policy did not evict " + buffer.getFirst()
+							+ " while the current time was " + times[i]
+							+ " and the granularity was " + granularity,
+							(buffer.getFirst() >= times[i] - granularity));
+				}
+
+				// test influence of other evictions
+				for (int j = numToDelete[i % numToDelete.length]; j > 0; j--) {
+					if (!buffer.isEmpty()) {
+						buffer.removeFirst();
+					}
+				}
+
+				// add current element to buffer if it is no fake
+				if (!fake) {
+					buffer.add(times[i]);
+				}
+
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6f3bf576/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
new file mode 100644
index 0000000..839564d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.policy;
+
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TimeTriggerPolicyTest {
+
+	@Test
+	public void timeTriggerRegularNotifyTest() {
+		// create some test data
+		Integer[] times = { 1, 3, 4, 6, 7, 9, 14, 20, 21, 22, 30 };
+
+		// create a timestamp
+		@SuppressWarnings("serial")
+		TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+
+			@Override
+			public long getStartTime() {
+				return 0;
+			}
+
+		};
+
+		// test different granularity
+		for (long granularity = 0; granularity < 31; granularity++) {
+			// create policy
+			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp,
+					new Extractor<Long, Integer>() {
+						private static final long serialVersionUID = 1L;
+
+						@Override
+						public Integer extract(Long in) {
+							return in.intValue();
+						}
+					});
+
+			// remember window border
+			// Remark: This might NOT work in case the timeStamp uses
+			// System.getCurrentTimeMillis to determine the start time.
+			long currentTime = timeStamp.getStartTime();
+
+			// test by adding values
+			for (int i = 0; i < times.length; i++) {
+				boolean result = policy.notifyTrigger(times[i]);
+				// start time is included, but end time is excluded: >=
+				if (times[i] >= currentTime + granularity) {
+					if (granularity != 0) {
+						currentTime = times[i] - ((times[i] - currentTime) % granularity);
+					}
+					assertTrue("The policy did not trigger at pos " + i + " (current time border: "
+							+ currentTime + "; current granularity: " + granularity
+							+ "; data point time: " + times[i] + ")", result);
+				} else {
+					assertFalse("The policy triggered wrong at pos " + i
+							+ " (current time border: " + currentTime + "; current granularity: "
+							+ granularity + "; data point time: " + times[i] + ")", result);
+				}
+			}
+		}
+
+	}
+
+	@Test
+	public void timeTriggerPreNotifyTest() {
+		// create some test data
+		Integer[] times = { 1, 3, 20, 26};
+
+		// create a timestamp
+		@SuppressWarnings("serial")
+		TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+
+			@Override
+			public long getStartTime() {
+				return 0;
+			}
+
+		};
+
+		// create policy
+		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp,
+				new Extractor<Long, Integer>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer extract(Long in) {
+						return in.intValue();
+					}
+				});
+
+		//expected result
+		Integer[][] result={{},{},{5,10,15},{25}};
+		
+		//call policy
+		for (int i=0;i<times.length;i++){
+			arrayEqualityCheck(result[i],policy.preNotifyTrigger(times[i]));
+			policy.notifyTrigger(times[i]);
+		}
+	}
+	
+	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+		assertEquals("The result arrays must have the same length", array1.length, array2.length);
+		for (int i = 0; i < array1.length; i++) {
+			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+		}
+	}
+
+}