You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:19 UTC

[14/34] incubator-flink git commit: [streaming] New windowing API merge and cleanup + several minor fixes

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
deleted file mode 100755
index 0135b34..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-
-public class WindowReduceInvokable<OUT> extends BatchReduceInvokable<OUT> {
-	private static final long serialVersionUID = 1L;
-	protected long startTime;
-	protected long nextRecordTime;
-	protected TimeStamp<OUT> timestamp;
-	protected StreamWindow window;
-
-	public WindowReduceInvokable(ReduceFunction<OUT> reduceFunction, long windowSize,
-			long slideInterval, TimeStamp<OUT> timestamp) {
-		super(reduceFunction, windowSize, slideInterval);
-		this.timestamp = timestamp;
-		this.startTime = timestamp.getStartTime();
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-		this.window = new StreamWindow();
-		this.batch = this.window;
-		if (timestamp instanceof DefaultTimeStamp) {
-			(new TimeCheck()).start();
-		}
-	}
-
-	protected class StreamWindow extends StreamBatch {
-
-		private static final long serialVersionUID = 1L;
-
-		public StreamWindow() {
-			super();
-
-		}
-
-		@Override
-		public void reduceToBuffer(OUT nextValue) throws Exception {
-
-			checkWindowEnd(timestamp.getTimestamp(nextValue));
-
-			if (currentValue != null) {
-				currentValue = reducer.reduce(serializer.copy(currentValue), serializer.copy(nextValue));
-			} else {
-				currentValue = nextValue;
-			}
-		}
-
-		protected synchronized void checkWindowEnd(long timeStamp) {
-			nextRecordTime = timeStamp;
-
-			while (miniBatchEnd()) {
-				addToBuffer();
-				if (batchEnd()) {
-					reduceBatch();
-				}
-			}
-		}
-		
-		@Override
-		public void reduceBatch() {
-			reduce(this);
-		}
-
-		@Override
-		protected boolean miniBatchEnd() {
-			if (nextRecordTime < startTime + granularity) {
-				return false;
-			} else {
-				startTime += granularity;
-				return true;
-			}
-		}
-
-		@Override
-		public boolean batchEnd() {
-			if (minibatchCounter == numberOfBatches) {
-				minibatchCounter -= batchPerSlide;
-				return true;
-			}
-			return false;
-		}
-
-	}
-
-	private class TimeCheck extends Thread {
-		@Override
-		public void run() {
-			while (true) {
-				try {
-					Thread.sleep(slideSize);
-				} catch (InterruptedException e) {
-				}
-				if (isRunning) {
-					window.checkWindowEnd(System.currentTimeMillis());
-				} else {
-					break;
-				}
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
new file mode 100644
index 0000000..2752fd1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingGroupInvokable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class WindowingGroupInvokable<IN, OUT> extends WindowingInvokable<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+	GroupReduceFunction<IN, OUT> reducer;
+
+	public WindowingGroupInvokable(GroupReduceFunction<IN, OUT> userFunction,
+			LinkedList<TriggerPolicy<IN>> triggerPolicies,
+			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+		super(userFunction, triggerPolicies, evictionPolicies);
+		this.reducer = userFunction;
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		reducer.reduce(buffer, collector);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
index 29975f4..d0855e6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokable.java
@@ -17,8 +17,12 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
@@ -28,12 +32,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, String[]>> {
+public abstract class WindowingInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 	/**
 	 * Auto-generated serial version UID
@@ -47,9 +46,8 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
 	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
 	private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
 	private LinkedList<Thread> activePolicyTreads = new LinkedList<Thread>();
-	private LinkedList<IN> buffer = new LinkedList<IN>();
+	protected LinkedList<IN> buffer = new LinkedList<IN>();
 	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-	private ReduceFunction<IN> reducer;
 
 	/**
 	 * This constructor created a windowing invokable using trigger and eviction
@@ -64,12 +62,10 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
 	 *            A list of {@link EvictionPolicy}s and/or
 	 *            {@link ActiveEvictionPolicy}s
 	 */
-	public WindowingInvokable(ReduceFunction<IN> userFunction,
-			LinkedList<TriggerPolicy<IN>> triggerPolicies,
+	public WindowingInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>> triggerPolicies,
 			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
 		super(userFunction);
 
-		this.reducer = userFunction;
 		this.triggerPolicies = triggerPolicies;
 		this.evictionPolicies = evictionPolicies;
 
@@ -369,28 +365,4 @@ public class WindowingInvokable<IN> extends StreamInvokable<IN, Tuple2<IN, Strin
 		}
 	}
 
-	@Override
-	protected void callUserFunction() throws Exception {
-		Iterator<IN> reducedIterator = buffer.iterator();
-		IN reduced = null;
-
-		while (reducedIterator.hasNext() && reduced == null) {
-			reduced = reducedIterator.next();
-		}
-
-		while (reducedIterator.hasNext()) {
-			IN next = reducedIterator.next();
-			if (next != null) {
-				reduced = reducer.reduce(reduced, next);
-			}
-		}
-		if (reduced != null) {
-			String[] tmp = new String[currentTriggerPolicies.size()];
-			for (int i = 0; i < tmp.length; i++) {
-				tmp[i] = currentTriggerPolicies.get(i).toString();
-			}
-			collector.collect(new Tuple2<IN, String[]>(reduced, tmp));
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
new file mode 100644
index 0000000..89f98e0
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowingReduceInvokable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class WindowingReduceInvokable<IN> extends WindowingInvokable<IN, IN> {
+
+	private static final long serialVersionUID = 1L;
+
+	ReduceFunction<IN> reducer;
+
+	public WindowingReduceInvokable(ReduceFunction<IN> userFunction,
+			LinkedList<TriggerPolicy<IN>> triggerPolicies,
+			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+		super(userFunction, triggerPolicies, evictionPolicies);
+		this.reducer = userFunction;
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		Iterator<IN> reducedIterator = buffer.iterator();
+		IN reduced = null;
+
+		while (reducedIterator.hasNext() && reduced == null) {
+			reduced = reducedIterator.next();
+		}
+
+		while (reducedIterator.hasNext()) {
+			IN next = reducedIterator.next();
+			if (next != null) {
+				reduced = reducer.reduce(reduced, next);
+			}
+		}
+		if (reduced != null) {
+			collector.collect(reduced);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
index b74e952..6e045f6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -23,21 +23,24 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 
 /**
- * Represents a count based trigger or eviction policy.
- * Use the {@link Count#of(int)} to get an instance.
+ * Represents a count based trigger or eviction policy. Use the
+ * {@link Count#of(int)} to get an instance.
  */
 @SuppressWarnings("rawtypes")
 public class Count implements WindowingHelper {
 
 	private int count;
+	private int deleteOnEviction = 1;
+	private int startValue = CountTriggerPolicy.DEFAULT_START_VALUE;
 
 	/**
 	 * Specifies on which element a trigger or an eviction should happen (based
 	 * on the count of the elements).
 	 * 
-	 * This constructor does exactly the same as {@link Count#of(int)}. 
+	 * This constructor does exactly the same as {@link Count#of(int)}.
 	 * 
-	 * @param count the number of elements to count before trigger/evict
+	 * @param count
+	 *            the number of elements to count before trigger/evict
 	 */
 	public Count(int count) {
 		this.count = count;
@@ -45,12 +48,22 @@ public class Count implements WindowingHelper {
 
 	@Override
 	public EvictionPolicy<?> toEvict() {
-		return new CountEvictionPolicy(count);
+		return new CountEvictionPolicy(count, deleteOnEviction);
 	}
 
 	@Override
 	public TriggerPolicy<?> toTrigger() {
-		return new CountTriggerPolicy(count);
+		return new CountTriggerPolicy(count, startValue);
+	}
+
+	public Count withDelete(int deleteOnEviction) {
+		this.deleteOnEviction = deleteOnEviction;
+		return this;
+	}
+
+	public Count startingAt(int startValue) {
+		this.startValue = startValue;
+		return this;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 37f2902..0b73150 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.api.windowing.helper;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 
 /**
@@ -39,7 +40,9 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 
 	private int timeVal;
 	private TimeUnit granularity;
-	private Extractor<Long, DATA> timeToData;
+	private Extractor<Long, DATA> longToDATAExtractor;
+	private TimeStamp<DATA> timeStamp;
+	private long delay;
 
 	/**
 	 * Creates an helper representing a trigger which triggers every given
@@ -47,50 +50,29 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	 * 
 	 * @param timeVal
 	 *            The number of time units
-	 * @param granularity
+	 * @param timeUnit
 	 *            The unit of time such as minute oder millisecond. Note that
 	 *            the smallest possible granularity is milliseconds. Any smaller
 	 *            time unit might cause an error at runtime due to conversion
 	 *            problems.
-	 * @param timeToData
-	 *            This policy creates fake elements to not miss windows in case
-	 *            no element arrived within the duration of the window. This
-	 *            extractor should wrap a long into such an element of type
-	 *            DATA.
 	 */
-	public Time(int timeVal, TimeUnit granularity, Extractor<Long, DATA> timeToData) {
+	private Time(int timeVal, TimeUnit timeUnit) {
 		this.timeVal = timeVal;
-		this.granularity = granularity;
-		this.timeToData = timeToData;
-	}
-
-	/**
-	 * Creates an helper representing a trigger which triggers every given
-	 * timeVal or an eviction which evicts all elements older than timeVal.
-	 * 
-	 * The default granularity for timeVal used in this method is seconds.
-	 * 
-	 * @param timeVal
-	 *            The number of time units measured in seconds.
-	 * @param timeToData
-	 *            This policy creates fake elements to not miss windows in case
-	 *            no element arrived within the duration of the window. This
-	 *            extractor should wrap a long into such an element of type
-	 *            DATA.
-	 */
-	public Time(int timeVal, Extractor<Long, DATA> timeToData) {
-		this(timeVal, TimeUnit.SECONDS, timeToData);
+		this.granularity = timeUnit;
+		this.longToDATAExtractor = new NullExtractor<DATA>();
+		this.timeStamp = new DefaultTimeStamp<DATA>();
+		this.delay = 0;
 	}
 
 	@Override
 	public EvictionPolicy<DATA> toEvict() {
-		return new TimeEvictionPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>());
+		return new TimeEvictionPolicy<DATA>(granularityInMillis(), timeStamp);
 	}
 
 	@Override
 	public TriggerPolicy<DATA> toTrigger() {
-		return new TimeTriggerPolicy<DATA>(granularityInMillis(), new DefaultTimeStamp<DATA>(),
-				timeToData);
+		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay,
+				longToDATAExtractor);
 	}
 
 	/**
@@ -104,22 +86,39 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	 *            the smallest possible granularity is milliseconds. Any smaller
 	 *            time unit might cause an error at runtime due to conversion
 	 *            problems.
-	 * @param timeToData
-	 *            This policy creates fake elements to not miss windows in case
-	 *            no element arrived within the duration of the window. This
-	 *            extractor should wrap a long into such an element of type
-	 *            DATA.
 	 * @return an helper representing a trigger which triggers every given
 	 *         timeVal or an eviction which evicts all elements older than
 	 *         timeVal.
 	 */
-	public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity,
-			Extractor<Long, DATA> timeToData) {
-		return new Time<DATA>(timeVal, granularity, timeToData);
+	public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity) {
+		return new Time<DATA>(timeVal, granularity);
+	}
+
+	@SuppressWarnings("unchecked")
+	public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp, Extractor<Long, R> extractor) {
+		this.timeStamp = (TimeStamp<DATA>) timeStamp;
+		this.longToDATAExtractor = (Extractor<Long, DATA>) extractor;
+		return (Time<R>) this;
+	}
+
+	public Time<DATA> withDelay(long delay) {
+		this.delay = delay;
+		return this;
 	}
 
 	private long granularityInMillis() {
 		return this.granularity.toMillis(this.timeVal);
 	}
 
+	public static class NullExtractor<T> implements Extractor<Long, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T extract(Long in) {
+			return null;
+		}
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index d5ae932..7a3c75a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -30,7 +30,7 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
 	 */
 	private static final long serialVersionUID = -6357200688886103968L;
 
-	private static final int DEFAULT_START_VALUE = 0;
+	public static final int DEFAULT_START_VALUE = 0;
 
 	private int counter;
 	private int max;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index d9acc84..b212293 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -87,10 +87,10 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 
 		// delete and count expired tuples
 		int counter = 0;
+		long threshold = timestamp.getTimestamp(datapoint) - granularity;
 		while (!buffer.isEmpty()) {
 
-			if (timestamp.getTimestamp(buffer.getFirst()) < timestamp.getTimestamp(datapoint)
-					- granularity) {
+			if (timestamp.getTimestamp(buffer.getFirst()) < threshold) {
 				buffer.removeFirst();
 				counter++;
 			} else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index ff93ac9..1dd713f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -41,9 +41,11 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 */
 	private static final long serialVersionUID = -5122753802440196719L;
 
-	private long startTime;
-	private long granularity;
-	private TimeStamp<DATA> timestamp;
+	protected long startTime;
+	protected long granularity;
+	protected TimeStamp<DATA> timestamp;
+	protected long delay;
+
 	private Extractor<Long, DATA> longToDATAExtractor;
 
 	/**
@@ -54,8 +56,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 * example, the policy will trigger at every second point in time.
 	 * 
 	 * @param granularity
-	 *            The granularity of the trigger. If this value is set to 2 the
-	 *            policy will trigger at every second time point
+	 *            The granularity of the trigger. If this value is set to x the
+	 *            policy will trigger at every x-th time point
 	 * @param timestamp
 	 *            The {@link TimeStamp} to measure the time with. This can be
 	 *            either user defined of provided by the API.
@@ -72,10 +74,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	/**
 	 * This is mostly the same as
-	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
-	 * to granularity and timestamp a delay can be specified for the first
-	 * trigger. If the start time given by the timestamp is x, the delay is y,
-	 * and the granularity is z, the first trigger will happen at x+y+z.
+	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In
+	 * addition to granularity and timestamp a delay can be specified for the
+	 * first trigger. If the start time given by the timestamp is x, the delay
+	 * is y, and the granularity is z, the first trigger will happen at x+y+z.
 	 * 
 	 * @param granularity
 	 *            The granularity of the trigger. If this value is set to 2 the
@@ -98,21 +100,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		this.startTime = timestamp.getStartTime() + delay;
 		this.timestamp = timestamp;
 		this.granularity = granularity;
+		this.delay = delay;
 		this.longToDATAExtractor = timeWrapper;
-	}
 
-	@Override
-	public synchronized boolean notifyTrigger(DATA datapoint) {
-		long recordTime = timestamp.getTimestamp(datapoint);
-		// start time is included, but end time is excluded: >=
-		if (recordTime >= startTime + granularity) {
-			if (granularity != 0) {
-				startTime = recordTime - ((recordTime - startTime) % granularity);
-			}
-			return true;
-		} else {
-			return false;
-		}
 	}
 
 	/**
@@ -126,7 +116,8 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		// check if there is more then one window border missed
 		// use > here. In case >= would fit, the regular call will do the job.
 		while (timestamp.getTimestamp(datapoint) > startTime + granularity) {
-			fakeElements.add(longToDATAExtractor.extract(startTime += granularity));
+			startTime += granularity;
+			fakeElements.add(longToDATAExtractor.extract(startTime));
 		}
 		return (DATA[]) fakeElements.toArray();
 	}
@@ -162,7 +153,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 		if (System.currentTimeMillis() >= startTime + granularity) {
 			startTime += granularity;
-			callback.sendFakeElement(longToDATAExtractor.extract(startTime += granularity));
+			callback.sendFakeElement(longToDATAExtractor.extract(startTime));
 		}
 
 	}
@@ -192,8 +183,22 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	}
 
 	@Override
+	public synchronized boolean notifyTrigger(DATA datapoint) {
+		long recordTime = timestamp.getTimestamp(datapoint);
+		// start time is included, but end time is excluded: >=
+		if (recordTime >= startTime + granularity) {
+			if (granularity != 0) {
+				startTime = recordTime - ((recordTime - startTime) % granularity);
+			}
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
 	public TimeTriggerPolicy<DATA> clone() {
-		return new TimeTriggerPolicy<DATA>(granularity, timestamp, 0, longToDATAExtractor);
+		return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay, longToDATAExtractor);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index d65436c..c494d5f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -17,13 +17,25 @@
 
 package org.apache.flink.streaming.api;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class PrintTest{
+public class PrintTest implements Serializable {
 
 	private static final long MEMORYSIZE = 32;
 
@@ -44,12 +56,52 @@ public class PrintTest{
 			return true;
 		}
 	}
-	
+
 	@Test
 	public void test() throws Exception {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
+
+		List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String, Integer>>();
+
+		env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)
+				.window(Time.of(2, TimeUnit.MILLISECONDS).withTimeStamp(new TimeStamp<Integer>() {
+
+					/**
+					 * 
+					 */
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public long getTimestamp(Integer value) {
+
+						return value;
+					}
+
+					@Override
+					public long getStartTime() {
+						return 1;
+					}
+				}, new Extractor<Long, Integer>() {
+
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Integer extract(Long in) {
+						return in.intValue();
+					}
+				})).every(Count.of(2)).reduceGroup(new GroupReduceFunction<Integer, String>() {
+
+					@Override
+					public void reduce(Iterable<Integer> values, Collector<String> out)
+							throws Exception {
+						String o = "|";
+						for (Integer v : values) {
+							o = o + v + "|";
+						}
+						out.collect(o);
+					}
+				}).print();
 		env.executeTest(MEMORYSIZE);
-		
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
deleted file mode 100644
index f157a08..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchGroupReduceTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class BatchGroupReduceTest {
-
-	public static final class MySlidingBatchReduce implements GroupReduceFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
-			for (Integer value : values) {
-				out.collect(value.toString());
-			}
-			out.collect(END_OF_BATCH);
-		}
-	}
-
-	private final static String END_OF_BATCH = "end of batch";
-	private final static int SLIDING_BATCH_SIZE = 3;
-	private final static int SLIDE_SIZE = 2;
-
-	@Test
-	public void slidingBatchReduceTest() {
-		BatchGroupReduceInvokable<Integer, String> invokable = new BatchGroupReduceInvokable<Integer, String>(
-				new MySlidingBatchReduce(), SLIDING_BATCH_SIZE, SLIDE_SIZE);
-
-		List<String> expected = Arrays.asList("1", "2", "3", END_OF_BATCH, "3", "4", "5",
-				END_OF_BATCH, "5", "6", "7", END_OF_BATCH);
-		List<String> actual = MockInvokable.createAndExecute(invokable,
-				Arrays.asList(1, 2, 3, 4, 5, 6, 7));
-
-		assertEquals(expected, actual);
-	}
-
-	public static final class MyBatchReduce implements GroupReduceFunction<Double, Double> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterable<Double> values, Collector<Double> out) throws Exception {
-
-			Double sum = 0.;
-			Double count = 0.;
-			for (Double value : values) {
-				sum += value;
-				count++;
-			}
-			if (count > 0) {
-				out.collect(Double.valueOf(sum / count));
-			}
-		}
-	}
-
-	private static final int BATCH_SIZE = 5;
-
-	@Test
-	public void nonSlidingBatchReduceTest() {
-		List<Double> inputs = new ArrayList<Double>();
-		for (Double i = 1.; i <= 100; i++) {
-			inputs.add(i);
-		}
-
-		BatchGroupReduceInvokable<Double, Double> invokable = new BatchGroupReduceInvokable<Double, Double>(
-				new MyBatchReduce(), BATCH_SIZE, BATCH_SIZE);
-
-		List<Double> avgs = MockInvokable.createAndExecute(invokable, inputs);
-
-		for (int i = 0; i < avgs.size(); i++) {
-			assertEquals(3.0 + i * BATCH_SIZE, avgs.get(i), 0);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
deleted file mode 100755
index b154f7f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.junit.Test;
-
-public class BatchReduceTest {
-
-	@Test
-	public void BatchReduceInvokableTest() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		for (Integer i = 1; i <= 10; i++) {
-			inputs.add(i);
-		}
-		BatchReduceInvokable<Integer> invokable = new BatchReduceInvokable<Integer>(
-				new ReduceFunction<Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer reduce(Integer value1, Integer value2) throws Exception {
-						return value1 + value2;
-					}
-				}, 3, 2);
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(6);
-		expected.add(12);
-		expected.add(18);
-		expected.add(24);
-		expected.add(19);
-		assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
-
-		List<Integer> inputs2 = new ArrayList<Integer>();
-		inputs2.add(1);
-		inputs2.add(2);
-		inputs2.add(-1);
-		inputs2.add(-3);
-		inputs2.add(-4);
-
-		BatchReduceInvokable<Integer> invokable2 = new BatchReduceInvokable<Integer>(
-				new ReduceFunction<Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer reduce(Integer value1, Integer value2) throws Exception {
-						if (value1 <= value2) {
-							return value1;
-						} else {
-							return value2;
-						}
-					}
-				}, 2, 3);
-
-		List<Integer> expected2 = new ArrayList<Integer>();
-		expected2.add(1);
-		expected2.add(-4);
-
-		assertEquals(expected2, MockInvokable.createAndExecute(invokable2, inputs2));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
deleted file mode 100755
index db2b8cf..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchGroupReduceTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.ObjectKeySelector;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class GroupedBatchGroupReduceTest {
-
-	public static final class MySlidingBatchReduce1 implements GroupReduceFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
-			for (Integer value : values) {
-				out.collect(value.toString());
-			}
-			out.collect(END_OF_GROUP);
-		}
-	}
-
-	public static final class MySlidingBatchReduce2 extends
-			RichGroupReduceFunction<Tuple2<Integer, String>, String> {
-		private static final long serialVersionUID = 1L;
-
-		String openString;
-
-		@Override
-		public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<String> out)
-				throws Exception {
-			out.collect(openString);
-			for (Tuple2<Integer, String> value : values) {
-				out.collect(value.f0.toString());
-			}
-			out.collect(END_OF_GROUP);
-		}
-
-		@Override
-		public void open(Configuration c) {
-			openString = "open";
-		}
-	}
-
-	private final static String END_OF_GROUP = "end of group";
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void slidingBatchGroupReduceTest() {
-		@SuppressWarnings("rawtypes")
-		GroupedBatchGroupReduceInvokable<Integer, String> invokable1 = new GroupedBatchGroupReduceInvokable<Integer, String>(
-				new MySlidingBatchReduce1(), 2, 2, new ObjectKeySelector());
-
-		List<String> expected = Arrays.asList("1", "1", END_OF_GROUP, "3", "3", END_OF_GROUP, "2",
-				END_OF_GROUP);
-		List<String> actual = MockInvokable.createAndExecute(invokable1,
-				Arrays.asList(1, 1, 2, 3, 3));
-
-		assertEquals(expected, actual);
-
-		GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String> invokable2 = new GroupedBatchGroupReduceInvokable<Tuple2<Integer, String>, String>(
-				new MySlidingBatchReduce2(), 2, 2, new TupleKeySelector<Tuple2<Integer, String>>(1));
-
-		expected = Arrays.asList("open", "1", "2", END_OF_GROUP, "open", "3", "3", END_OF_GROUP,
-				"open", "4", END_OF_GROUP);
-		actual = MockInvokable.createAndExecute(invokable2, Arrays.asList(
-				new Tuple2<Integer, String>(1, "a"), new Tuple2<Integer, String>(2, "a"),
-				new Tuple2<Integer, String>(3, "b"), new Tuple2<Integer, String>(3, "b"),
-				new Tuple2<Integer, String>(4, "a")));
-
-		assertEquals(expected, actual);
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
deleted file mode 100755
index 783119c..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedBatchReduceTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.ObjectKeySelector;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.junit.Test;
-
-public class GroupedBatchReduceTest {
-
-	@Test
-	public void BatchReduceInvokableTest() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(1);
-		inputs.add(5);
-		inputs.add(5);
-		inputs.add(5);
-		inputs.add(1);
-		inputs.add(1);
-		inputs.add(5);
-		inputs.add(1);
-		inputs.add(5);
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(15);
-		expected.add(3);
-		expected.add(3);
-		expected.add(15);
-
-		GroupedBatchReduceInvokable<Integer> invokable = new GroupedBatchReduceInvokable<Integer>(
-				new ReduceFunction<Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer reduce(Integer value1, Integer value2) throws Exception {
-						return value1 + value2;
-					}
-				}, 3, 2, new ObjectKeySelector<Integer>());
-
-		List<Integer> actual = MockInvokable.createAndExecute(invokable, inputs);
-		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
-		assertEquals(expected.size(), actual.size());
-
-		List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
-		inputs2.add(new Tuple2<Integer, String>(1, "a"));
-		inputs2.add(new Tuple2<Integer, String>(0, "b"));
-		inputs2.add(new Tuple2<Integer, String>(2, "a"));
-		inputs2.add(new Tuple2<Integer, String>(-1, "a"));
-		inputs2.add(new Tuple2<Integer, String>(-2, "a"));
-		inputs2.add(new Tuple2<Integer, String>(10, "a"));
-		inputs2.add(new Tuple2<Integer, String>(2, "b"));
-		inputs2.add(new Tuple2<Integer, String>(1, "a"));
-
-		List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>();
-		expected2.add(new Tuple2<Integer, String>(-1, "a"));
-		expected2.add(new Tuple2<Integer, String>(-2, "a"));
-		expected2.add(new Tuple2<Integer, String>(0, "b"));
-
-		GroupedBatchReduceInvokable<Tuple2<Integer, String>> invokable2 = new GroupedBatchReduceInvokable<Tuple2<Integer, String>>(
-				new ReduceFunction<Tuple2<Integer, String>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1,
-							Tuple2<Integer, String> value2) throws Exception {
-						if (value1.f0 <= value2.f0) {
-							return value1;
-						} else {
-							return value2;
-						}
-					}
-				}, 3, 3, new TupleKeySelector<Tuple2<Integer, String>>(1));
-
-		List<Tuple2<Integer, String>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
-
-		assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
-				new HashSet<Tuple2<Integer, String>>(actual2));
-		assertEquals(expected2.size(), actual2.size());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
deleted file mode 100644
index e93647f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokableTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class GroupedWindowGroupReduceInvokableTest {
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void windowReduceTest() {
-		List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
-		inputs2.add(new Tuple2<String, Integer>("a", 1));
-		inputs2.add(new Tuple2<String, Integer>("a", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 5));
-		inputs2.add(new Tuple2<String, Integer>("a", 7));
-		inputs2.add(new Tuple2<String, Integer>("b", 9));
-		inputs2.add(new Tuple2<String, Integer>("b", 10));
-		// 1,2-4,5-7,8-10
-
-		List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
-		expected2.add(new Tuple2<String, Integer>("a", 3));
-		expected2.add(new Tuple2<String, Integer>("b", 4));
-		expected2.add(new Tuple2<String, Integer>("b", 5));
-		expected2.add(new Tuple2<String, Integer>("a", 7));
-		expected2.add(new Tuple2<String, Integer>("b", 10));
-
-		GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>> invokable2 = new GroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<String, Integer>>(
-				new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void reduce(Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-						Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>("", 0);
-
-						for (@SuppressWarnings("unused")
-						Tuple2<String, Integer> value : values) {
-						}
-
-						for (Tuple2<String, Integer> value : values) {
-							outTuple.f0 = value.f0;
-							outTuple.f1 += value.f1;
-						}
-						out.collect(outTuple);
-					}
-				}, 2, 3, new TupleKeySelector( 0),
-				new TimeStamp<Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Tuple2<String, Integer> value) {
-						return value.f1;
-					}
-
-					@Override
-					public long getStartTime() {
-						return 1;
-					}
-				});
-
-		List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
-
-		assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
-				new HashSet<Tuple2<String, Integer>>(actual2));
-		assertEquals(expected2.size(), actual2.size());
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
index 93848db..ebfed05 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowingInvokableTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -30,12 +30,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.windowing.extractor.Extractor;
 import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.util.MockInvokable;
@@ -78,7 +78,7 @@ public class GroupedWindowingInvokableTest {
 
 		LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
 
-		GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
+		GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
 				new ReduceFunction<Integer>() {
 					private static final long serialVersionUID = 1L;
 
@@ -95,11 +95,11 @@ public class GroupedWindowingInvokableTest {
 					}
 				}, triggers, evictions, centralTriggers);
 
-		List<Tuple2<Integer, String[]>> result = MockInvokable.createAndExecute(invokable, inputs);
+		List<Integer> result = MockInvokable.createAndExecute(invokable, inputs);
 
 		List<Integer> actual = new LinkedList<Integer>();
-		for (Tuple2<Integer, String[]> current : result) {
-			actual.add(current.f0);
+		for (Integer current : result) {
+			actual.add(current);
 		}
 
 		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(actual));
@@ -137,7 +137,7 @@ public class GroupedWindowingInvokableTest {
 
 		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> centralTriggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
 
-		GroupedWindowingInvokable<Tuple2<Integer, String>> invokable2 = new GroupedWindowingInvokable<Tuple2<Integer, String>>(
+		GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable2 = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
 				new ReduceFunction<Tuple2<Integer, String>>() {
 					private static final long serialVersionUID = 1L;
 
@@ -153,12 +153,11 @@ public class GroupedWindowingInvokableTest {
 				}, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions,
 				centralTriggers);
 
-		List<Tuple2<Tuple2<Integer, String>, String[]>> result = MockInvokable.createAndExecute(
-				invokable2, inputs2);
+		List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2);
 
 		List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>();
-		for (Tuple2<Tuple2<Integer, String>, String[]> current : result) {
-			actual2.add(current.f0);
+		for (Tuple2<Integer, String> current : result) {
+			actual2.add(current);
 		}
 
 		assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
@@ -266,14 +265,13 @@ public class GroupedWindowingInvokableTest {
 
 		LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
 
-		GroupedWindowingInvokable<Tuple2<Integer, String>> invokable = new GroupedWindowingInvokable<Tuple2<Integer, String>>(
+		GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedWindowingInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
 				myReduceFunction, new TupleKeySelector<Tuple2<Integer, String>>(1),
 				distributedTriggers, evictions, triggers);
 
 		ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
-		for (Tuple2<Tuple2<Integer, String>, String[]> t : MockInvokable.createAndExecute(
-				invokable, inputs)) {
-			result.add(t.f0);
+		for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t);
 		}
 
 		assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
@@ -330,7 +328,7 @@ public class GroupedWindowingInvokableTest {
 			}
 		};
 
-		GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
+		GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
 				myReduceFunction, new KeySelector<Integer, Integer>() {
 					private static final long serialVersionUID = 1L;
 
@@ -341,8 +339,8 @@ public class GroupedWindowingInvokableTest {
 				}, distributedTriggers, evictions, triggers);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
-			result.add(t.f0);
+		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t);
 		}
 
 		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
@@ -350,8 +348,8 @@ public class GroupedWindowingInvokableTest {
 	}
 
 	/**
-	 * Test for combination of centralized trigger and
-	 * distributed trigger at the same time
+	 * Test for combination of centralized trigger and distributed trigger at
+	 * the same time
 	 */
 	@Test
 	public void testGroupedWindowingInvokableCentralAndDistrTrigger() {
@@ -406,7 +404,7 @@ public class GroupedWindowingInvokableTest {
 			}
 		};
 
-		GroupedWindowingInvokable<Integer> invokable = new GroupedWindowingInvokable<Integer>(
+		GroupedWindowingInvokable<Integer, Integer> invokable = new GroupedWindowingInvokable<Integer, Integer>(
 				myReduceFunction, new KeySelector<Integer, Integer>() {
 					private static final long serialVersionUID = 1L;
 
@@ -417,8 +415,8 @@ public class GroupedWindowingInvokableTest {
 				}, distributedTriggers, evictions, triggers);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
-			result.add(t.f0);
+		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t);
 		}
 
 		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
deleted file mode 100644
index 3e3179a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.util.Collector;
-import org.junit.Before;
-import org.junit.Test;
-
-public class WindowGroupReduceInvokableTest {
-
-	public static final class MySlidingWindowReduce implements GroupReduceFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterable<Integer> values, Collector<String> out) throws Exception {
-			for (Integer value : values) {
-				out.collect(value.toString());
-			}
-			out.collect(EOW);
-		}
-	}
-
-	public static final class MyTimestamp implements TimeStamp<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		private Iterator<Long> timestamps;
-		private long start;
-
-		public MyTimestamp(List<Long> timestamps) {
-			this.timestamps = timestamps.iterator();
-			this.start = timestamps.get(0);
-		}
-
-		@Override
-		public long getTimestamp(Integer value) {
-			long ts = timestamps.next();
-			return ts;
-		}
-
-		@Override
-		public long getStartTime() {
-			return start;
-		}
-	}
-
-	private final static String EOW = "|";
-
-	private static List<WindowGroupReduceInvokable<Integer, String>> invokables = new ArrayList<WindowGroupReduceInvokable<Integer, String>>();
-	private static List<List<String>> expectedResults = new ArrayList<List<String>>();
-
-	@Before
-	public void before() {
-		long windowSize = 3;
-		long slideSize = 2;
-		List<Long> timestamps = Arrays.asList(101L, 102L, 103L, 104L, 105L, 106L, 107L, 108L, 109L,
-				110L);
-		expectedResults.add(Arrays.asList("1", "2", "3", EOW, "3", "4", "5", EOW, "5", "6", "7",
-				EOW, "7", "8", "9", EOW, "9", "10", EOW));
-		invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
-				windowSize, slideSize, new MyTimestamp(timestamps)));
-
-		windowSize = 10;
-		slideSize = 5;
-		timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
-		expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", "6", EOW, "3",
-				"4", "5", "6", EOW, "7", EOW, "7",
-				"8", "9", EOW, "8", "9", "10", EOW));
-		invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
-				windowSize, slideSize, new MyTimestamp(timestamps)));
-
-		windowSize = 10;
-		slideSize = 4;
-		timestamps = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L);
-		expectedResults.add(Arrays.asList("1", "2","3" ,EOW, "3", "4", "5","6", EOW, "3", "4", "5", "6",
-				EOW, "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "8","9",
-				"10", EOW));
-		invokables.add(new WindowGroupReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
-				windowSize, slideSize, new MyTimestamp(timestamps)));
-	}
-
-	@Test
-	public void slidingBatchReduceTest() {
-		List<List<String>> actualResults = new ArrayList<List<String>>();
-
-		for (WindowGroupReduceInvokable<Integer, String> invokable : invokables) {
-			List<String> result = MockInvokable.createAndExecute(invokable,
-					Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
-			actualResults.add(result);
-		}
-
-		Iterator<List<String>> actualResult = actualResults.iterator();
-
-		for (List<String> expectedResult : expectedResults) {
-			assertEquals(expectedResult, actualResult.next());
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
deleted file mode 100755
index 5d10eff..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.keys.TupleKeySelector;
-import org.junit.Test;
-
-public class WindowReduceInvokableTest {
-
-	@Test
-	public void windowReduceTest() {
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(3);
-		inputs.add(4);
-		inputs.add(5);
-		inputs.add(10);
-		inputs.add(11);
-		inputs.add(11);
-		// 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
-		// 12-12-5-10-32
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(12);
-		expected.add(12);
-		expected.add(5);
-		expected.add(10);
-		expected.add(32);
-
-		WindowReduceInvokable<Integer> invokable = new WindowReduceInvokable<Integer>(
-				new ReduceFunction<Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer reduce(Integer value1, Integer value2) throws Exception {
-						return value1 + value2;
-					}
-				}, 4, 2, new TimeStamp<Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Integer value) {
-						return value;
-					}
-
-					@Override
-					public long getStartTime() {
-						return 1;
-					}
-				});
-
-		assertEquals(expected, MockInvokable.createAndExecute(invokable, inputs));
-
-		List<Tuple2<String, Integer>> inputs2 = new ArrayList<Tuple2<String, Integer>>();
-		inputs2.add(new Tuple2<String, Integer>("a", 1));
-		inputs2.add(new Tuple2<String, Integer>("a", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 2));
-		inputs2.add(new Tuple2<String, Integer>("b", 5));
-		inputs2.add(new Tuple2<String, Integer>("a", 7));
-		inputs2.add(new Tuple2<String, Integer>("b", 9));
-		inputs2.add(new Tuple2<String, Integer>("b", 10));
-
-		List<Tuple2<String, Integer>> expected2 = new ArrayList<Tuple2<String, Integer>>();
-		expected2.add(new Tuple2<String, Integer>("a", 3));
-		expected2.add(new Tuple2<String, Integer>("b", 4));
-		expected2.add(new Tuple2<String, Integer>("b", 5));
-		expected2.add(new Tuple2<String, Integer>("a", 7));
-		expected2.add(new Tuple2<String, Integer>("b", 10));
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		GroupedWindowReduceInvokable<Tuple2<String, Integer>> invokable2 = new GroupedWindowReduceInvokable<Tuple2<String, Integer>>(
-				new ReduceFunction<Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-							Tuple2<String, Integer> value2) throws Exception {
-						return new Tuple2<String, Integer>(value1.f0, value1.f1 + value2.f1);
-					}
-				}, 2, 3, new TupleKeySelector(0), new TimeStamp<Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Tuple2<String, Integer> value) {
-						return value.f1;
-					}
-
-					@Override
-					public long getStartTime() {
-						return 1;
-					}
-				});
-
-		List<Tuple2<String, Integer>> actual2 = MockInvokable.createAndExecute(invokable2, inputs2);
-		assertEquals(new HashSet<Tuple2<String, Integer>>(expected2),
-				new HashSet<Tuple2<String, Integer>>(actual2));
-		assertEquals(expected2.size(), actual2.size());
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
index 79df1c5..8e96a7c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowingInvokableTest.java
@@ -17,21 +17,20 @@
 
 package org.apache.flink.streaming.api.invokable.operator;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.util.MockInvokable;
 import org.junit.Test;
@@ -92,6 +91,7 @@ public class WindowingInvokableTest {
 		// trigger after 4, then every 2)
 		triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L,
 				new Extractor<Long, Integer>() {
+
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -103,12 +103,12 @@ public class WindowingInvokableTest {
 		// Always delete all elements older then 4
 		evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
 
-		WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
-				triggers, evictions);
+		WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
-			result.add(t.f0);
+		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t);
 		}
 
 		assertEquals(expected, result);
@@ -147,8 +147,8 @@ public class WindowingInvokableTest {
 		// time after the 3rd element
 		evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
 
-		WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
-				triggers, evictions);
+		WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
 
 		List<Integer> expected = new ArrayList<Integer>();
 		expected.add(6);
@@ -157,8 +157,8 @@ public class WindowingInvokableTest {
 		expected.add(24);
 		expected.add(19);
 		List<Integer> result = new ArrayList<Integer>();
-		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
-			result.add(t.f0);
+		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t);
 		}
 		assertEquals(expected, result);
 
@@ -200,16 +200,16 @@ public class WindowingInvokableTest {
 		// time after on the 5th element
 		evictions.add(new CountEvictionPolicy<Integer>(3, 3, -1));
 
-		WindowingInvokable<Integer> invokable2 = new WindowingInvokable<Integer>(myReduceFunction,
-				triggers, evictions);
+		WindowingInvokable<Integer, Integer> invokable2 = new WindowingReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
 
 		List<Integer> expected2 = new ArrayList<Integer>();
 		expected2.add(1);
 		expected2.add(-4);
 
 		result = new ArrayList<Integer>();
-		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable2, inputs2)) {
-			result.add(t.f0);
+		for (Integer t : MockInvokable.createAndExecute(invokable2, inputs2)) {
+			result.add(t);
 		}
 
 		assertEquals(expected2, result);
@@ -223,8 +223,8 @@ public class WindowingInvokableTest {
 		triggers.add(new CountTriggerPolicy<Integer>(3));
 
 		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
-		evictions.add(new CountEvictionPolicy<Integer>(2,2));
-		evictions.add(new CountEvictionPolicy<Integer>(3,3));
+		evictions.add(new CountEvictionPolicy<Integer>(2, 2));
+		evictions.add(new CountEvictionPolicy<Integer>(3, 3));
 
 		List<Integer> inputs = new ArrayList<Integer>();
 		for (Integer i = 1; i <= 10; i++) {
@@ -258,12 +258,12 @@ public class WindowingInvokableTest {
 			}
 		};
 
-		WindowingInvokable<Integer> invokable = new WindowingInvokable<Integer>(myReduceFunction,
-				triggers, evictions);
+		WindowingInvokable<Integer, Integer> invokable = new WindowingReduceInvokable<Integer>(
+				myReduceFunction, triggers, evictions);
 
 		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Tuple2<Integer, String[]> t : MockInvokable.createAndExecute(invokable, inputs)) {
-			result.add(t.f0);
+		for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+			result.add(t);
 		}
 
 		assertEquals(expected, result);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 839564d..34e292a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
 public class TimeTriggerPolicyTest {
 
 	@Test
@@ -49,15 +52,8 @@ public class TimeTriggerPolicyTest {
 		// test different granularity
 		for (long granularity = 0; granularity < 31; granularity++) {
 			// create policy
-			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp,
-					new Extractor<Long, Integer>() {
-						private static final long serialVersionUID = 1L;
-
-						@Override
-						public Integer extract(Long in) {
-							return in.intValue();
-						}
-					});
+			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
+					timeStamp, new Time.NullExtractor<Integer>());
 
 			// remember window border
 			// Remark: This might NOT work in case the timeStamp uses
@@ -88,7 +84,7 @@ public class TimeTriggerPolicyTest {
 	@Test
 	public void timeTriggerPreNotifyTest() {
 		// create some test data
-		Integer[] times = { 1, 3, 20, 26};
+		Integer[] times = { 1, 3, 20, 26 };
 
 		// create a timestamp
 		@SuppressWarnings("serial")
@@ -107,8 +103,9 @@ public class TimeTriggerPolicyTest {
 		};
 
 		// create policy
-		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp,
-				new Extractor<Long, Integer>() {
+		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
+				timeStamp, new Extractor<Long, Integer>() {
+
 					private static final long serialVersionUID = 1L;
 
 					@Override
@@ -117,16 +114,16 @@ public class TimeTriggerPolicyTest {
 					}
 				});
 
-		//expected result
-		Integer[][] result={{},{},{5,10,15},{25}};
-		
-		//call policy
-		for (int i=0;i<times.length;i++){
-			arrayEqualityCheck(result[i],policy.preNotifyTrigger(times[i]));
+		// expected result
+		Integer[][] result = { {}, {}, { 5, 10, 15 }, { 25 } };
+
+		// call policy
+		for (int i = 0; i < times.length; i++) {
+			arrayEqualityCheck(result[i], policy.preNotifyTrigger(times[i]));
 			policy.notifyTrigger(times[i]);
 		}
 	}
-	
+
 	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
 		assertEquals("The result arrays must have the same length", array1.length, array2.length);
 		for (int i = 0; i < array1.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 2def142..375c86d 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -17,11 +17,14 @@
 
 package org.apache.flink.streaming.examples.ml;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.util.Collector;
 
 /**
@@ -58,7 +61,8 @@ public class IncrementalLearningSkeleton {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		// build new model on every second of new data
-		DataStream<Double[]> model = env.addSource(new TrainingDataSource()).window(5000)
+		DataStream<Double[]> model = env.addSource(new TrainingDataSource())
+				.window(Time.of(5000, TimeUnit.MILLISECONDS))
 				.reduceGroup(new PartialModelBuilder());
 
 		// use partial model for prediction

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f96ba06e/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
deleted file mode 100644
index c0e768b..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/BasicExample.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.util.Collector;
-
-/**
- * A minimal example as introduction to the policy based windowing
- */
-public class BasicExample {
-
-	private static final int PARALLELISM = 1;
-
-	public static void main(String[] args) throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		// This reduce function does a String concat.
-		ReduceFunction<String> reduceFunction = new ReduceFunction<String>() {
-
-			/**
-			 * Auto generates version ID
-			 */
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String reduce(String value1, String value2) throws Exception {
-				return value1 + "|" + value2;
-			}
-
-		};
-
-		DataStream<Tuple2<String, String[]>> stream = env.addSource(new BasicSource())
-				.window(Count.of(5)).every(Count.of(2)).reduce(reduceFunction);
-
-		stream.print();
-
-		env.execute();
-	}
-
-	public static class BasicSource implements SourceFunction<String> {
-
-		private static final long serialVersionUID = 1L;
-		String str = new String("streaming");
-
-		@Override
-		public void invoke(Collector<String> out) throws Exception {
-			// continuous emit
-			while (true) {
-				out.collect(str);
-			}
-		}
-	}
-
-}