You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:24 UTC

[19/34] incubator-flink git commit: [streaming] Extractor requirement removed for custom timestamp windowing

[streaming] Extractor requirement removed for custom timestamp windowing


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/55ee64f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/55ee64f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/55ee64f6

Branch: refs/heads/master
Commit: 55ee64f6c9824d51922fe4a8fb1435030925b110
Parents: f96ba06
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Nov 24 00:32:52 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100

----------------------------------------------------------------------
 .../operator/GroupedWindowingInvokable.java     | 11 +++--
 .../invokable/operator/WindowingInvokable.java  | 10 ++---
 .../streaming/api/windowing/helper/Time.java    |  8 +---
 .../windowing/policy/ActiveEvictionPolicy.java  |  2 +-
 .../policy/ActiveEvictionPolicyWrapper.java     |  5 ++-
 .../windowing/policy/ActiveTriggerCallback.java | 11 ++---
 .../windowing/policy/ActiveTriggerPolicy.java   | 16 +++----
 .../windowing/policy/TimeEvictionPolicy.java    | 46 ++++++++++++++------
 .../api/windowing/policy/TimeTriggerPolicy.java | 42 ++++++++----------
 .../apache/flink/streaming/api/PrintTest.java   |  8 ----
 .../operator/GroupedWindowingInvokableTest.java | 13 +-----
 .../operator/WindowingInvokableTest.java        | 13 +-----
 .../windowing/policy/TimeTriggerPolicyTest.java | 16 ++-----
 13 files changed, 84 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
index e957b8c..c341e56 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokable.java
@@ -138,8 +138,7 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	 *            If only one element is contained a group, this element itself
 	 *            is returned as aggregated result.)
 	 */
-	public GroupedWindowingInvokable(Function userFunction,
-			KeySelector<IN, ?> keySelector,
+	public GroupedWindowingInvokable(Function userFunction, KeySelector<IN, ?> keySelector,
 			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
 			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
 			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies) {
@@ -174,8 +173,8 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 
 			// Run the precalls for central active triggers
 			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
-				IN[] result = trigger.preNotifyTrigger(reuse.getObject());
-				for (IN in : result) {
+				Object[] result = trigger.preNotifyTrigger(reuse.getObject());
+				for (Object in : result) {
 					for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
 						group.processFakeElement(in, trigger);
 					}
@@ -305,7 +304,7 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	 * 
 	 * @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
 	 */
-	private class WindowingCallback implements ActiveTriggerCallback<IN> {
+	private class WindowingCallback implements ActiveTriggerCallback {
 		private ActiveTriggerPolicy<IN> policy;
 
 		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
@@ -313,7 +312,7 @@ public class GroupedWindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 		}
 
 		@Override
-		public void sendFakeElement(IN datapoint) {
+		public void sendFakeElement(Object datapoint) {
 			for (WindowingInvokable<IN, OUT> group : windowingGroups.values()) {
 				group.processFakeElement(datapoint, policy);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
index d0855e6..95a999c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
@@ -101,7 +101,7 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
 	 * This class allows the active trigger threads to call back and push fake
 	 * elements at any time.
 	 */
-	private class WindowingCallback implements ActiveTriggerCallback<IN> {
+	private class WindowingCallback implements ActiveTriggerCallback {
 		private ActiveTriggerPolicy<IN> policy;
 
 		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
@@ -109,7 +109,7 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
 		}
 
 		@Override
-		public void sendFakeElement(IN datapoint) {
+		public void sendFakeElement(Object datapoint) {
 			processFakeElement(datapoint, this.policy);
 		}
 
@@ -215,7 +215,7 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
 	 * @param currentPolicy
 	 *            the policy which produced this fake element
 	 */
-	protected synchronized void processFakeElement(IN input, TriggerPolicy<IN> currentPolicy) {
+	protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy) {
 
 		// Process the evictions and take care of double evictions
 		// In case there are multiple eviction policies present,
@@ -283,8 +283,8 @@ public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OU
 			// of the different fake elements returned by this triggers becomes
 			// a problem. This might lead to unexpected results...
 			// Should we limit the number of active triggers to 0 or 1?
-			IN[] result = trigger.preNotifyTrigger(input);
-			for (IN in : result) {
+			Object[] result = trigger.preNotifyTrigger(input);
+			for (Object in : result) {
 				processFakeElement(in, trigger);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 0b73150..86ad952 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -40,7 +40,6 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 
 	private int timeVal;
 	private TimeUnit granularity;
-	private Extractor<Long, DATA> longToDATAExtractor;
 	private TimeStamp<DATA> timeStamp;
 	private long delay;
 
@@ -59,7 +58,6 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	private Time(int timeVal, TimeUnit timeUnit) {
 		this.timeVal = timeVal;
 		this.granularity = timeUnit;
-		this.longToDATAExtractor = new NullExtractor<DATA>();
 		this.timeStamp = new DefaultTimeStamp<DATA>();
 		this.delay = 0;
 	}
@@ -71,8 +69,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 
 	@Override
 	public TriggerPolicy<DATA> toTrigger() {
-		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay,
-				longToDATAExtractor);
+		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay);
 	}
 
 	/**
@@ -95,9 +92,8 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	}
 
 	@SuppressWarnings("unchecked")
-	public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp, Extractor<Long, R> extractor) {
+	public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp) {
 		this.timeStamp = (TimeStamp<DATA>) timeStamp;
-		this.longToDATAExtractor = (Extractor<Long, DATA>) extractor;
 		return (Time<R>) this;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
index 17d4914..fe172bc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
@@ -47,6 +47,6 @@ public interface ActiveEvictionPolicy<DATA> extends EvictionPolicy<DATA> {
 	 * @return the number of elements to delete from the buffer (only real
 	 *         elements are counted)
 	 */
-	public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize);
+	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
index a110fbc..b3b6935 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
@@ -55,9 +55,10 @@ public class ActiveEvictionPolicyWrapper<DATA> implements ActiveEvictionPolicy<D
 		return nestedPolicy.notifyEviction(datapoint, triggered, bufferSize);
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
-	public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize) {
-		return nestedPolicy.notifyEviction(datapoint, true, bufferSize);
+	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
+		return nestedPolicy.notifyEviction((DATA) datapoint, true, bufferSize);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
index d74a3ae..c44be37 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
@@ -20,18 +20,15 @@ package org.apache.flink.streaming.api.windowing.policy;
 /**
  * In case an {@link ActiveTriggerPolicy} is used, it can implement own
  * {@link Runnable} classes. Such {@link Runnable} classes will be executed as
- * an own thread and can submit fake elements, to the element
- * buffer at any time.
+ * an own thread and can submit fake elements, to the element buffer at any
+ * time.
  * 
  * The factory method for runnables of the {@link ActiveTriggerPolicy} gets an
  * instance of this interface as parameter. The describes adding of elements can
  * be done by the runnable using the methods provided in this interface.
  * 
- * @param <DATA>
- *            The data type which can be consumed by the methods provided in
- *            this callback implementation.
  */
-public interface ActiveTriggerCallback<DATA> {
+public interface ActiveTriggerCallback {
 
 	/**
 	 * Submits a new fake data point to the element buffer. Such a fake element
@@ -43,6 +40,6 @@ public interface ActiveTriggerCallback<DATA> {
 	 * @param datapoint
 	 *            the fake data point to be added
 	 */
-	public void sendFakeElement(DATA datapoint);
+	public void sendFakeElement(Object datapoint);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
index f549766..a8a704d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -24,12 +24,12 @@ import org.apache.flink.streaming.api.invokable.util.TimeStamp;
  * for active triggers. Active triggers can act in two ways:
  * 
  * 1) Whenever an element arrives at the invokable, the
- * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called first.
- * It can return zero ore more fake data points which will be added before the
- * the currently arrived real element gets processed. This allows to handle
- * empty windows in time based windowing with an user defined {@link TimeStamp}.
- * Triggers are not called on fake datapoint. A fake datapoint is always
- * considered as triggered.
+ * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
+ * first. It can return zero ore more fake data points which will be added
+ * before the the currently arrived real element gets processed. This allows to
+ * handle empty windows in time based windowing with an user defined
+ * {@link TimeStamp}. Triggers are not called on fake datapoint. A fake
+ * datapoint is always considered as triggered.
  * 
  * 2) An active trigger has a factory method for a runnable. This factory method
  * gets called at the start up of the invokable. The returned runnable will be
@@ -57,7 +57,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
 	 * @return zero ore more fake data points which will be added before the the
 	 *         currently arrived real element gets processed.
 	 */
-	public DATA[] preNotifyTrigger(DATA datapoint);
+	public Object[] preNotifyTrigger(DATA datapoint);
 
 	/**
 	 * This is the factory method for a runnable. This factory method gets
@@ -76,6 +76,6 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
 	 *         executed as an own thread and can submit fake elements, to the
 	 *         element buffer at any time.
 	 */
-	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback<DATA> callback);
+	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index b212293..e886c09 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -69,25 +69,49 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 		this.granularity = granularity;
 	}
 
+	@SuppressWarnings("unchecked")
 	@Override
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-		return notifyEviction(datapoint, triggered, bufferSize, false);
+	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
+		checkForDeleted(bufferSize);
+
+		long threshold;
+		try {
+			threshold = (Long) datapoint - granularity;
+		} catch (ClassCastException e) {
+			threshold = timestamp.getTimestamp((DATA) datapoint) - granularity;
+		}
+
+		// return result
+		return deleteAndCountExpired(threshold);
+
 	}
 
 	@Override
-	public int notifyEvictionWithFakeElement(DATA datapoint, int bufferSize) {
-		return notifyEviction(datapoint, true, bufferSize, true);
+	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
+
+		checkForDeleted(bufferSize);
+
+		// delete and count expired tuples
+		long threshold = timestamp.getTimestamp(datapoint) - granularity;
+		int counter = deleteAndCountExpired(threshold);
+
+		// Add current element to buffer
+		buffer.add(datapoint);
+
+		// return result
+		return counter;
+
 	}
 
-	private int notifyEviction(DATA datapoint, boolean triggered, int bufferSize, boolean isFake) {
+	private void checkForDeleted(int bufferSize) {
 		// check for deleted tuples (deletes by other policies)
 		while (bufferSize < this.buffer.size()) {
 			this.buffer.removeFirst();
 		}
+	}
 
-		// delete and count expired tuples
+	private int deleteAndCountExpired(long threshold) {
 		int counter = 0;
-		long threshold = timestamp.getTimestamp(datapoint) - granularity;
 		while (!buffer.isEmpty()) {
 
 			if (timestamp.getTimestamp(buffer.getFirst()) < threshold) {
@@ -97,14 +121,8 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 				break;
 			}
 		}
-
-		if (!isFake) {
-			// Add current element to buffer
-			buffer.add(datapoint);
-		}
-
-		// return result
 		return counter;
+
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 1dd713f..c3402c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -21,7 +21,6 @@ import java.util.LinkedList;
 
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
 
 /**
  * This trigger policy triggers with regard to the time. The is measured using a
@@ -46,8 +45,6 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	protected TimeStamp<DATA> timestamp;
 	protected long delay;
 
-	private Extractor<Long, DATA> longToDATAExtractor;
-
 	/**
 	 * This trigger policy triggers with regard to the time. The is measured
 	 * using a given {@link TimeStamp} implementation. A point in time is always
@@ -67,17 +64,16 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 *            extractor should wrap a long into such an element of type
 	 *            DATA.
 	 */
-	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp,
-			Extractor<Long, DATA> timeWrapper) {
-		this(granularity, timestamp, 0, timeWrapper);
+	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp) {
+		this(granularity, timestamp, 0);
 	}
 
 	/**
 	 * This is mostly the same as
-	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In
-	 * addition to granularity and timestamp a delay can be specified for the
-	 * first trigger. If the start time given by the timestamp is x, the delay
-	 * is y, and the granularity is z, the first trigger will happen at x+y+z.
+	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
+	 * to granularity and timestamp a delay can be specified for the first
+	 * trigger. If the start time given by the timestamp is x, the delay is y,
+	 * and the granularity is z, the first trigger will happen at x+y+z.
 	 * 
 	 * @param granularity
 	 *            The granularity of the trigger. If this value is set to 2 the
@@ -95,31 +91,27 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 *            extractor should wrap a long into such an element of type
 	 *            DATA.
 	 */
-	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay,
-			Extractor<Long, DATA> timeWrapper) {
+	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay) {
 		this.startTime = timestamp.getStartTime() + delay;
 		this.timestamp = timestamp;
 		this.granularity = granularity;
 		this.delay = delay;
-		this.longToDATAExtractor = timeWrapper;
-
 	}
 
 	/**
 	 * This method checks if we missed a window end. If this is the case we
 	 * trigger the missed windows using fake elements.
 	 */
-	@SuppressWarnings("unchecked")
 	@Override
-	public synchronized DATA[] preNotifyTrigger(DATA datapoint) {
-		LinkedList<DATA> fakeElements = new LinkedList<DATA>();
+	public synchronized Object[] preNotifyTrigger(DATA datapoint) {
+		LinkedList<Object> fakeElements = new LinkedList<Object>();
 		// check if there is more then one window border missed
 		// use > here. In case >= would fit, the regular call will do the job.
 		while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
 			startTime += granularity;
-			fakeElements.add(longToDATAExtractor.extract(startTime));
+			fakeElements.add(startTime);
 		}
-		return (DATA[]) fakeElements.toArray();
+		return (Object[]) fakeElements.toArray();
 	}
 
 	/**
@@ -134,7 +126,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 *         time. If any other time measure is used the method return null.
 	 */
 	@Override
-	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback<DATA> callback) {
+	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
 		if (this.timestamp instanceof DefaultTimeStamp) {
 			return new TimeCheck(callback);
 		} else {
@@ -149,19 +141,19 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 * @param callback
 	 *            The callback object.
 	 */
-	private synchronized void activeFakeElementEmission(ActiveTriggerCallback<DATA> callback) {
+	private synchronized void activeFakeElementEmission(ActiveTriggerCallback callback) {
 
 		if (System.currentTimeMillis() >= startTime + granularity) {
 			startTime += granularity;
-			callback.sendFakeElement(longToDATAExtractor.extract(startTime));
+			callback.sendFakeElement(startTime);
 		}
 
 	}
 
 	private class TimeCheck implements Runnable {
-		ActiveTriggerCallback<DATA> callback;
+		ActiveTriggerCallback callback;
 
-		public TimeCheck(ActiveTriggerCallback<DATA> callback) {
+		public TimeCheck(ActiveTriggerCallback callback) {
 			this.callback = callback;
 		}
 
@@ -198,7 +190,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	@Override
 	public TimeTriggerPolicy<DATA> clone() {
-		return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay, longToDATAExtractor);
+		return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index c494d5f..f7ba82e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -81,14 +81,6 @@ public class PrintTest implements Serializable {
 					public long getStartTime() {
 						return 1;
 					}
-				}, new Extractor<Long, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer extract(Long in) {
-						return in.intValue();
-					}
 				})).every(Count.of(2)).reduceGroup(new GroupReduceFunction<Integer, String>() {
 
 					@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
index ebfed05..79288d1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
@@ -28,14 +28,13 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
 import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.util.MockInvokable;
@@ -249,15 +248,7 @@ public class GroupedWindowingInvokableTest {
 		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
 		// Trigger every 2 time units but delay the first trigger by 2 (First
 		// trigger after 4, then every 2)
-		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L,
-				new Extractor<Long, Tuple2<Integer, String>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<Integer, String> extract(Long in) {
-						return new Tuple2<Integer, String>(in.intValue(), null);
-					}
-				}));
+		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
 
 		LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
 		// Always delete all elements older then 4

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
index 8e96a7c..32e71ba 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
@@ -25,12 +25,11 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.util.MockInvokable;
 import org.junit.Test;
@@ -89,16 +88,8 @@ public class WindowingInvokableTest {
 		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
 		// Trigger every 2 time units but delay the first trigger by 2 (First
 		// trigger after 4, then every 2)
-		triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L,
-				new Extractor<Long, Integer>() {
-
-					private static final long serialVersionUID = 1L;
+		triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L));
 
-					@Override
-					public Integer extract(Long in) {
-						return in.intValue();
-					}
-				}));
 		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
 		// Always delete all elements older then 4
 		evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/55ee64f6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 34e292a..1355b27 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.junit.Test;
 
 public class TimeTriggerPolicyTest {
@@ -53,7 +51,7 @@ public class TimeTriggerPolicyTest {
 		for (long granularity = 0; granularity < 31; granularity++) {
 			// create policy
 			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
-					timeStamp, new Time.NullExtractor<Integer>());
+					timeStamp);
 
 			// remember window border
 			// Remark: This might NOT work in case the timeStamp uses
@@ -104,18 +102,10 @@ public class TimeTriggerPolicyTest {
 
 		// create policy
 		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
-				timeStamp, new Extractor<Long, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer extract(Long in) {
-						return in.intValue();
-					}
-				});
+				timeStamp);
 
 		// expected result
-		Integer[][] result = { {}, {}, { 5, 10, 15 }, { 25 } };
+		Long[][] result = { {}, {}, { 5L, 10L, 15L }, { 25L } };
 
 		// call policy
 		for (int i = 0; i < times.length; i++) {