You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:41 UTC

[05/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
deleted file mode 100644
index 8d690cc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-
-public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private long windowSize;
-	private long slideSize;
-	private int start;
-
-	protected long index = 0;
-
-	public SlidingCountGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			KeySelector<T, ?> key, long windowSize, long slideSize, int start) {
-		super(reducer, serializer, key);
-		if (windowSize > slideSize) {
-			this.windowSize = windowSize;
-			this.slideSize = slideSize;
-			this.start = start;
-		} else {
-			throw new RuntimeException(
-					"Window size needs to be larger than slide size for the sliding pre-reducer");
-		}
-		index = index - start;
-	}
-
-	@Override
-	protected void afterStore() {
-		index++;
-	}
-
-	@Override
-	public void store(T element) throws Exception {
-		if (index >= 0) {
-			super.store(element);
-		} else {
-			index++;
-		}
-	}
-
-	@Override
-	protected boolean currentEligible(T next) {
-		if (index <= slideSize) {
-			return true;
-		} else {
-			return index == windowSize;
-		}
-	}
-
-	@Override
-	protected void afterEmit() {
-		if (index >= slideSize) {
-			index = index - slideSize;
-		}
-	}
-
-	@Override
-	public SlidingCountGroupedPreReducer<T> clone() {
-		return new SlidingCountGroupedPreReducer<T>(reducer, serializer, key, windowSize,
-				slideSize, start);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
deleted file mode 100644
index db14eb0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class SlidingCountPreReducer<T> extends SlidingPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private long windowSize;
-	private long slideSize;
-	private int start;
-
-	protected long index = 0;
-
-	public SlidingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			long windowSize, long slideSize, int start) {
-		super(reducer, serializer);
-		if (windowSize > slideSize) {
-			this.windowSize = windowSize;
-			this.slideSize = slideSize;
-			this.start = start;
-		} else {
-			throw new RuntimeException(
-					"Window size needs to be larger than slide size for the sliding pre-reducer");
-		}
-		index = index - start;
-	}
-
-	@Override
-	protected void afterStore() {
-		index++;
-	}
-
-	@Override
-	public void store(T element) throws Exception {
-		if (index >= 0) {
-			super.store(element);
-		} else {
-			index++;
-		}
-	}
-
-	@Override
-	protected boolean currentEligible(T next) {
-		if (index <= slideSize) {
-			return true;
-		} else {
-			return index == windowSize;
-		}
-	}
-
-	@Override
-	protected void afterEmit() {
-		if (index >= slideSize) {
-			index = index - slideSize;
-		}
-	}
-
-	@Override
-	public SlidingCountPreReducer<T> clone() {
-		return new SlidingCountPreReducer<T>(reducer, serializer, windowSize, slideSize, start);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
deleted file mode 100644
index 6e5462c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-
-/**
- * Grouped pre-reducer for sliding eviction policy
- * (the slide size is smaller than the window size).
- */
-public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	protected Map<Object, T> currentReducedMap = new HashMap<Object, T>();
-	protected LinkedList<Map<Object, T>> reducedMap = new LinkedList<Map<Object, T>>();
-
-	protected KeySelector<T, ?> key;
-
-	public SlidingGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			KeySelector<T, ?> key) {
-		super(reducer, serializer);
-		this.key = key;
-	}
-
-	public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
-		Map<Object, T> finalReduce = null;
-
-		if (!reducedMap.isEmpty()) {
-			finalReduce = reducedMap.get(0);
-			for (int i = 1; i < reducedMap.size(); i++) {
-				finalReduce = reduceMaps(finalReduce, reducedMap.get(i));
-
-			}
-			if (currentReducedMap != null) {
-				finalReduce = reduceMaps(finalReduce, currentReducedMap);
-			}
-
-		} else {
-			finalReduce = currentReducedMap;
-		}
-
-		if (finalReduce != null) {
-			currentWindow.addAll(finalReduce.values());
-			return true;
-		} else {
-			return false;
-		}
-
-	}
-
-	private Map<Object, T> reduceMaps(Map<Object, T> first, Map<Object, T> second) throws Exception {
-
-		Map<Object, T> reduced = new HashMap<Object, T>();
-
-		// Get the common keys in the maps
-		Set<Object> interSection = new HashSet<Object>();
-		Set<Object> diffFirst = new HashSet<Object>();
-		Set<Object> diffSecond = new HashSet<Object>();
-
-		for (Object key : first.keySet()) {
-			if (second.containsKey(key)) {
-				interSection.add(key);
-			} else {
-				diffFirst.add(key);
-			}
-		}
-
-		for (Object key : second.keySet()) {
-			if (!interSection.contains(key)) {
-				diffSecond.add(key);
-			}
-		}
-
-		// Reduce the common keys
-		for (Object key : interSection) {
-			reduced.put(
-					key,
-					reducer.reduce(serializer.copy(first.get(key)),
-							serializer.copy(second.get(key))));
-		}
-
-		for (Object key : diffFirst) {
-			reduced.put(key, first.get(key));
-		}
-
-		for (Object key : diffSecond) {
-			reduced.put(key, second.get(key));
-		}
-
-		return reduced;
-	}
-
-	protected void updateCurrent(T element) throws Exception {
-		if (currentReducedMap == null) {
-			currentReducedMap = new HashMap<Object, T>();
-			currentReducedMap.put(key.getKey(element), element);
-		} else {
-			Object nextKey = key.getKey(element);
-			T last = currentReducedMap.get(nextKey);
-			if (last == null) {
-				currentReducedMap.put(nextKey, element);
-			} else {
-				currentReducedMap.put(nextKey, reducer.reduce(serializer.copy(last), element));
-			}
-		}
-	}
-
-	@Override
-	protected void removeLastReduced() {
-		reducedMap.removeFirst();
-	}
-
-	@Override
-	protected void addCurrentToBuffer(T element) throws Exception {
-		reducedMap.add(currentReducedMap);
-	}
-
-	@Override
-	protected void resetCurrent() {
-		currentReducedMap = null;
-		elementsSinceLastPreAggregate = 0;
-	}
-
-	@Override
-	protected boolean currentNotEmpty() {
-		return currentReducedMap != null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
deleted file mode 100644
index e2c46a3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.LinkedList;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for sliding eviction policy
- * (the slide size is smaller than the window size).
- */
-public abstract class SlidingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
-
-	private static final long serialVersionUID = 1L;
-
-	protected ReduceFunction<T> reducer;
-
-	protected T currentReduced;
-	protected LinkedList<T> reduced = new LinkedList<T>();
-	protected LinkedList<Integer> elementsPerPreAggregate = new LinkedList<Integer>();
-
-	protected TypeSerializer<T> serializer;
-
-	protected int toRemove = 0;
-
-	protected int elementsSinceLastPreAggregate = 0;
-
-	public SlidingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
-		this.reducer = reducer;
-		this.serializer = serializer;
-	}
-
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-		StreamWindow<T> currentWindow = createEmptyWindow();
-
-		try {
-			if (addFinalAggregate(currentWindow) || emitEmpty) {
-				collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
-			} 
-			afterEmit();
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	protected void afterEmit() {
-		// Do nothing by default
-	}
-
-	public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
-		T finalReduce = null;
-
-		if (!reduced.isEmpty()) {
-			finalReduce = reduced.get(0);
-			for (int i = 1; i < reduced.size(); i++) {
-				finalReduce = reducer.reduce(finalReduce, serializer.copy(reduced.get(i)));
-
-			}
-			if (currentReduced != null) {
-				finalReduce = reducer.reduce(finalReduce, serializer.copy(currentReduced));
-			}
-
-		} else {
-			finalReduce = currentReduced;
-		}
-
-		if (finalReduce != null) {
-			currentWindow.add(finalReduce);
-			return true;
-		} else {
-			return false;
-		}
-
-	}
-
-	public void store(T element) throws Exception {
-		addToBufferIfEligible(element);
-		afterStore();
-	}
-
-	protected void afterStore() {
-		// Do nothing by default
-	}
-
-	protected void addToBufferIfEligible(T element) throws Exception {
-		if (currentEligible(element) && currentNotEmpty()) {
-			addCurrentToBuffer(element);
-			elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
-			elementsSinceLastPreAggregate = 0;
-			resetCurrent();
-		}
-		updateCurrent(element);
-
-		elementsSinceLastPreAggregate++;
-	}
-
-	protected void resetCurrent() {
-		currentReduced = null;
-	}
-
-	protected boolean currentNotEmpty() {
-		return currentReduced != null;
-	}
-
-	protected void updateCurrent(T element) throws Exception {
-		if (currentReduced == null) {
-			currentReduced = element;
-		} else {
-			currentReduced = reducer.reduce(serializer.copy(currentReduced), element);
-		}
-	}
-
-	protected void addCurrentToBuffer(T element) throws Exception {
-		reduced.add(currentReduced);
-	}
-
-	protected abstract boolean currentEligible(T next);
-
-	public void evict(int n) {
-		toRemove += n;
-
-		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
-		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
-			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
-			removeLastReduced();
-			lastPreAggregateSize = elementsPerPreAggregate.peek();
-		}
-
-		if (lastPreAggregateSize == null) {
-			toRemove = 0;
-		}
-	}
-
-	protected void removeLastReduced() {
-		reduced.removeFirst();
-	}
-
-	public static int max(int a, int b) {
-		if (a > b) {
-			return a;
-		} else {
-			return b;
-		}
-	}
-
-	@Override
-	public abstract SlidingPreReducer<T> clone();
-
-	@Override
-	public String toString() {
-		return currentReduced.toString();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
deleted file mode 100644
index cdb4207..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * Non-grouped pre-reducer for sliding time eviction policy.
- */
-public class SlidingTimeGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private long windowSize;
-	private long slideSize;
-	private TimestampWrapper<T> timestampWrapper;
-	private T lastStored;
-	protected long windowStartTime;
-
-	public SlidingTimeGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			KeySelector<T, ?> key, long windowSize, long slideSize,
-			TimestampWrapper<T> timestampWrapper) {
-		super(reducer, serializer, key);
-		if (windowSize > slideSize) {
-			this.windowSize = windowSize;
-			this.slideSize = slideSize;
-		} else {
-			throw new RuntimeException(
-					"Window size needs to be larger than slide size for the sliding pre-reducer");
-		}
-		this.timestampWrapper = timestampWrapper;
-		this.windowStartTime = timestampWrapper.getStartTime();
-	}
-
-	@Override
-	public void store(T element) throws Exception {
-		super.store(element);
-		lastStored = element;
-	}
-
-	@Override
-	public SlidingTimeGroupedPreReducer<T> clone() {
-		return new SlidingTimeGroupedPreReducer<T>(reducer, serializer, key, windowSize, slideSize,
-				timestampWrapper);
-	}
-
-	@Override
-	public String toString() {
-		return currentReducedMap.toString();
-	}
-
-	@Override
-	protected void afterEmit() {
-		if (lastStored != null) {
-			long lastTime = timestampWrapper.getTimestamp(lastStored);
-			if (lastTime - windowStartTime >= slideSize) {
-				windowStartTime = windowStartTime + slideSize;
-			}
-		}
-	}
-
-	@Override
-	public void evict(int n) {
-		toRemove += n;
-		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
-
-		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
-			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
-			removeLastReduced();
-			lastPreAggregateSize = elementsPerPreAggregate.peek();
-		}
-
-		if (toRemove > 0 && lastPreAggregateSize == null) {
-			resetCurrent();
-			toRemove = 0;
-		}
-	}
-
-	@Override
-	protected boolean currentEligible(T next) {
-		return windowStartTime == timestampWrapper.getStartTime()
-				|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
deleted file mode 100644
index d84505c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * Non-grouped pre-reducer for sliding time eviction policy
- * (the policies are based on time, and the slide size is smaller than the window size).
- */
-public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private long windowSize;
-	private long slideSize;
-	private TimestampWrapper<T> timestampWrapper;
-	private T lastStored;
-	protected long windowStartTime;
-
-	public SlidingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			long windowSize, long slideSize, TimestampWrapper<T> timestampWrapper) {
-		super(reducer, serializer);
-		if (windowSize > slideSize) {
-			this.windowSize = windowSize;
-			this.slideSize = slideSize;
-		} else {
-			throw new RuntimeException(
-					"Window size needs to be larger than slide size for the sliding pre-reducer");
-		}
-		this.timestampWrapper = timestampWrapper;
-		this.windowStartTime = timestampWrapper.getStartTime();
-	}
-
-	@Override
-	public void store(T element) throws Exception {
-		super.store(element);
-		lastStored = element;
-	}
-
-	@Override
-	public SlidingTimePreReducer<T> clone() {
-		return new SlidingTimePreReducer<T>(reducer, serializer, windowSize, slideSize,
-				timestampWrapper);
-	}
-
-	@Override
-	public String toString() {
-		return currentReduced.toString();
-	}
-
-	@Override
-	protected void afterEmit() {
-		if (lastStored != null) {
-			long lastTime = timestampWrapper.getTimestamp(lastStored);
-			if (lastTime - windowStartTime >= slideSize) {
-				windowStartTime = windowStartTime + slideSize;
-			}
-		}
-	}
-
-	@Override
-	public void evict(int n) {
-		toRemove += n;
-		Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
-
-		while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
-			toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
-			reduced.removeFirst();
-			lastPreAggregateSize = elementsPerPreAggregate.peek();
-		}
-
-		if (toRemove > 0 && lastPreAggregateSize == null) {
-			currentReduced = null;
-			elementsSinceLastPreAggregate = 0;
-			toRemove = 0;
-		}
-	}
-
-	@Override
-	protected boolean currentEligible(T next) {
-		return windowStartTime == timestampWrapper.getStartTime()
-				|| timestampWrapper.getTimestamp(next) - windowStartTime >= slideSize;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
deleted file mode 100644
index 37d3aae..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Grouped pre-reducer for tumbling eviction polciy.
- */
-public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
-
-	private static final long serialVersionUID = 1L;
-
-	private ReduceFunction<T> reducer;
-	private KeySelector<T, ?> keySelector;
-
-	private Map<Object, T> reducedValues;
-
-	private TypeSerializer<T> serializer;
-
-	private boolean evict = true;
-
-	public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
-			TypeSerializer<T> serializer) {
-		this(reducer, keySelector, serializer, true);
-	}
-
-	public TumblingGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
-			TypeSerializer<T> serializer, boolean evict) {
-		this.reducer = reducer;
-		this.serializer = serializer;
-		this.keySelector = keySelector;
-		this.reducedValues = new HashMap<Object, T>();
-		this.evict = evict;
-	}
-
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-
-		if (!reducedValues.isEmpty()) {
-			StreamWindow<T> currentWindow = createEmptyWindow();
-			currentWindow.addAll(reducedValues.values());
-			collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
-		} else if (emitEmpty) {
-			collector.collect(new StreamRecord<StreamWindow<T>>(createEmptyWindow()));
-		}
-		if (evict) {
-			reducedValues.clear();
-		}
-	}
-
-	public void store(T element) throws Exception {
-		Object key = keySelector.getKey(element);
-
-		T reduced = reducedValues.get(key);
-
-		if (reduced == null) {
-			reduced = element;
-		} else {
-			reduced = reducer.reduce(serializer.copy(reduced), element);
-		}
-
-		reducedValues.put(key, reduced);
-	}
-
-	@Override
-	public void evict(int n) {
-	}
-
-	@Override
-	public TumblingGroupedPreReducer<T> clone() {
-		return new TumblingGroupedPreReducer<T>(reducer, keySelector, serializer, evict);
-	}
-
-	@Override
-	public String toString() {
-		return reducedValues.toString();
-	}
-
-	public TumblingGroupedPreReducer<T> noEvict() {
-		this.evict = false;
-		return this;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
deleted file mode 100644
index 3a10be7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for tumbling eviction policy (the slide size is the
- * same as the window size).
- */
-public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggregator {
-
-	private static final long serialVersionUID = 1L;
-
-	private ReduceFunction<T> reducer;
-
-	private T reduced;
-	private TypeSerializer<T> serializer;
-
-	private boolean evict = true;
-
-	public TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer) {
-		this(reducer, serializer, true);
-	}
-
-	private TumblingPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-			boolean evict) {
-		this.reducer = reducer;
-		this.serializer = serializer;
-		this.evict = evict;
-	}
-
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-		if (reduced != null) {
-			StreamWindow<T> currentWindow = createEmptyWindow();
-			currentWindow.add(reduced);
-			collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
-		} else if (emitEmpty) {
-			collector.collect(new StreamRecord<StreamWindow<T>>(createEmptyWindow()));
-		}
-
-		if (evict) {
-			reduced = null;
-		}
-	}
-
-	public void store(T element) throws Exception {
-		if (reduced == null) {
-			reduced = element;
-		} else {
-			reduced = reducer.reduce(serializer.copy(reduced), element);
-		}
-	}
-
-	public void evict(int n) {
-	}
-
-	@Override
-	public TumblingPreReducer<T> clone() {
-		return new TumblingPreReducer<T>(reducer, serializer, evict);
-	}
-
-	@Override
-	public String toString() {
-		return reduced.toString();
-	}
-
-	@Override
-	public WindowBuffer<T> emitEmpty() {
-		emitEmpty = true;
-		return this;
-	}
-
-	public TumblingPreReducer<T> noEvict() {
-		this.evict = false;
-		return this;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
deleted file mode 100644
index 6e87d0b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Class for defining specialized buffers to store/emit window data.
- * Pre-aggregators should be implemented using this interface.
- */
-public abstract class WindowBuffer<T> implements Serializable, Cloneable {
-
-	private static final long serialVersionUID = 1L;
-
-	protected Integer nextID = 1;
-	protected boolean sequentialID = false;
-	protected boolean emitEmpty = false;
-	protected boolean emitPerGroup = false;
-
-	public abstract void store(T element) throws Exception;
-
-	public abstract void evict(int n);
-
-	public abstract void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector);
-
-	public abstract WindowBuffer<T> clone();
-
-	public WindowBuffer<T> emitEmpty() {
-		emitEmpty = true;
-		return this;
-	}
-
-	public WindowBuffer<T> sequentialID() {
-		sequentialID = true;
-		return this;
-	}
-
-	protected StreamWindow<T> createEmptyWindow() {
-		return sequentialID ? new StreamWindow<T>(nextID++) : new StreamWindow<T>();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 0b8482d..3a224e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -41,17 +41,19 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
@@ -95,6 +97,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
 		DataStreamSink<Long> connected = dataStream1.connect(dataStream2)
 				.flatMap(new CoFlatMapFunction<Long, Long, Long>() {
+					private static final long serialVersionUID = 1L;
+
 					@Override
 					public void flatMap1(Long value, Collector<Long> out) throws Exception {
 					}
@@ -103,14 +107,17 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 					public void flatMap2(Long value, Collector<Long> out) throws Exception {
 					}
 				}).name("testCoFlatMap")
-				.window(Count.of(10))
-				.foldWindow(0L, new FoldFunction<Long, Long>() {
+				.windowAll(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(CountTrigger.of(10)))
+				.fold(0L, new FoldFunction<Long, Long>() {
+					private static final long serialVersionUID = 1L;
+
 					@Override
 					public Long fold(Long accumulator, Long value) throws Exception {
 						return null;
 					}
-				}).name("testWindowFold")
-				.flatten()
+				})
+				.name("testWindowFold")
 				.print();
 
 		//test functionality through the operator names in the execution plan
@@ -133,15 +140,15 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 	public void testPartitioning() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStream src1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-		DataStream src2 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
-		ConnectedStreams connected = src1.connect(src2);
+		DataStream<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+		DataStream<Tuple2<Long, Long>> src2 = env.fromElements(new Tuple2<>(0L, 0L));
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connected = src1.connect(src2);
 
 		//Testing DataStream grouping
-		DataStream group1 = src1.keyBy(0);
-		DataStream group2 = src1.keyBy(1, 0);
-		DataStream group3 = src1.keyBy("f0");
-		DataStream group4 = src1.keyBy(new FirstSelector());
+		DataStream<Tuple2<Long, Long>> group1 = src1.keyBy(0);
+		DataStream<Tuple2<Long, Long>> group2 = src1.keyBy(1, 0);
+		DataStream<Tuple2<Long, Long>> group3 = src1.keyBy("f0");
+		DataStream<Tuple2<Long, Long>> group4 = src1.keyBy(new FirstSelector());
 
 		int id1 = createDownStreamId(group1);
 		int id2 = createDownStreamId(group2);
@@ -159,10 +166,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isKeyed(group4));
 
 		//Testing DataStream partitioning
-		DataStream partition1 = src1.partitionByHash(0);
-		DataStream partition2 = src1.partitionByHash(1, 0);
-		DataStream partition3 = src1.partitionByHash("f0");
-		DataStream partition4 = src1.partitionByHash(new FirstSelector());
+		DataStream<Tuple2<Long, Long>> partition1 = src1.partitionByHash(0);
+		DataStream<Tuple2<Long, Long>> partition2 = src1.partitionByHash(1, 0);
+		DataStream<Tuple2<Long, Long>> partition3 = src1.partitionByHash("f0");
+		DataStream<Tuple2<Long, Long>> partition4 = src1.partitionByHash(new FirstSelector());
 
 		int pid1 = createDownStreamId(partition1);
 		int pid2 = createDownStreamId(partition2);
@@ -187,9 +194,9 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 			}
 		};
 
-		DataStream customPartition1 = src1.partitionCustom(longPartitioner, 0);
-		DataStream customPartition3 = src1.partitionCustom(longPartitioner, "f0");
-		DataStream customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
+		DataStream<Tuple2<Long, Long>> customPartition1 = src1.partitionCustom(longPartitioner, 0);
+		DataStream<Tuple2<Long, Long>> customPartition3 = src1.partitionCustom(longPartitioner, "f0");
+		DataStream<Tuple2<Long, Long>> customPartition4 = src1.partitionCustom(longPartitioner, new FirstSelector());
 
 		int cid1 = createDownStreamId(customPartition1);
 		int cid2 = createDownStreamId(customPartition3);
@@ -204,19 +211,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertFalse(isKeyed(customPartition4));
 
 		//Testing ConnectedStreams grouping
-		ConnectedStreams connectedGroup1 = connected.keyBy(0, 0);
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup1 = connected.keyBy(0, 0);
 		Integer downStreamId1 = createDownStreamId(connectedGroup1);
 
-		ConnectedStreams connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup2 = connected.keyBy(new int[]{0}, new int[]{0});
 		Integer downStreamId2 = createDownStreamId(connectedGroup2);
 
-		ConnectedStreams connectedGroup3 = connected.keyBy("f0", "f0");
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup3 = connected.keyBy("f0", "f0");
 		Integer downStreamId3 = createDownStreamId(connectedGroup3);
 
-		ConnectedStreams connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
 		Integer downStreamId4 = createDownStreamId(connectedGroup4);
 
-		ConnectedStreams connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedGroup5 = connected.keyBy(new FirstSelector(), new FirstSelector());
 		Integer downStreamId5 = createDownStreamId(connectedGroup5);
 
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(), downStreamId1)));
@@ -241,19 +248,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		assertTrue(isKeyed(connectedGroup5));
 
 		//Testing ConnectedStreams partitioning
-		ConnectedStreams connectedPartition1 = connected.partitionByHash(0, 0);
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition1 = connected.partitionByHash(0, 0);
 		Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
 
-		ConnectedStreams connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
 		Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
 
-		ConnectedStreams connectedPartition3 = connected.partitionByHash("f0", "f0");
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition3 = connected.partitionByHash("f0", "f0");
 		Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
 
-		ConnectedStreams connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
 		Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
 
-		ConnectedStreams connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
+		ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
 		Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
 
 		assertTrue(isPartitioned(env.getStreamGraph().getStreamEdge(src1.getId(),
@@ -295,7 +302,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 	public void testParallelism() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-		DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+		DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<>(0L, 0L));
 		env.setParallelism(10);
 
 		SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
@@ -306,18 +313,20 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		}).name("MyMap");
 
 		DataStream<Long> windowed = map
-				.window(Count.of(10))
-				.foldWindow(0L, new FoldFunction<Long, Long>() {
+				.windowAll(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(CountTrigger.of(10)))
+				.fold(0L, new FoldFunction<Long, Long>() {
 					@Override
 					public Long fold(Long accumulator, Long value) throws Exception {
 						return null;
 					}
-				})
-				.flatten();
+				});
 
 		windowed.addSink(new NoOpSink<Long>());
 
 		DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
+			private static final long serialVersionUID = 1L;
+
 			@Override
 			public void invoke(Long value) throws Exception {
 			}
@@ -343,6 +352,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 			src.setParallelism(3);
 			fail();
 		} catch (IllegalArgumentException success) {
+			// do nothing
 		}
 
 		DataStreamSource<Long> parallelSource = env.generateSequence(0, 0);
@@ -373,26 +383,33 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 			}
 		});
 
-		assertEquals(TypeExtractor.getForObject(new Tuple2<Integer, String>(0, "")), map.getType());
+		assertEquals(TypeExtractor.getForObject(new Tuple2<>(0, "")), map.getType());
 
-		WindowedDataStream<String> window = map
-				.window(Count.of(5))
-				.mapWindow(new WindowMapFunction<Tuple2<Integer, String>, String>() {
+		DataStream<String> window = map
+				.windowAll(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(CountTrigger.of(5)))
+				.apply(new AllWindowFunction<Tuple2<Integer, String>, String, GlobalWindow>() {
 					@Override
-					public void mapWindow(Iterable<Tuple2<Integer, String>> values, Collector<String> out) throws Exception {
+					public void apply(GlobalWindow window,
+							Iterable<Tuple2<Integer, String>> values,
+							Collector<String> out) throws Exception {
+
 					}
 				});
 
 		assertEquals(TypeExtractor.getForClass(String.class), window.getType());
 
 		DataStream<CustomPOJO> flatten = window
-				.foldWindow(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
+				.windowAll(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(CountTrigger.of(5)))
+				.fold(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
+					private static final long serialVersionUID = 1L;
+
 					@Override
 					public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
 						return null;
 					}
-				})
-				.flatten();
+				});
 
 		assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
 	}
@@ -415,6 +432,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
 
 		FlatMapFunction<Long, Integer> flatMapFunction = new FlatMapFunction<Long, Integer>() {
+			private static final long serialVersionUID = 1L;
+
 			@Override
 			public void flatMap(Long value, Collector<Integer> out) throws Exception {
 			}
@@ -430,8 +449,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 			}
 		};
 
-		DataStream<Integer> unionFilter = map
-				.union(flatMap)
+		DataStream<Integer> unionFilter = map.union(flatMap)
 				.filter(filterFunction);
 
 		unionFilter.addSink(new NoOpSink<Integer>());
@@ -471,6 +489,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 
 		ConnectedStreams<Integer, Integer> connect = map.connect(flatMap);
 		CoMapFunction<Integer, Integer, String> coMapper = new CoMapFunction<Integer, Integer, String>() {
+			private static final long serialVersionUID = 1L;
+
 			@Override
 			public String map1(Integer value) {
 				return null;
@@ -597,16 +617,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		return operator.getUserFunction();
 	}
 
-	private static Integer createDownStreamId(DataStream dataStream) {
+	private static Integer createDownStreamId(DataStream<?> dataStream) {
 		return dataStream.print().getTransformation().getId();
 	}
 
-	private static boolean isKeyed(DataStream dataStream) {
+	private static boolean isKeyed(DataStream<?> dataStream) {
 		return dataStream instanceof KeyedStream;
 	}
 
+	@SuppressWarnings("rawtypes,unchecked")
 	private static Integer createDownStreamId(ConnectedStreams dataStream) {
-		SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+		SingleOutputStreamOperator<?, ?> coMap = dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
+			private static final long serialVersionUID = 1L;
+
 			@Override
 			public Object map1(Tuple2<Long, Long> value) {
 				return null;
@@ -621,7 +644,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 		return coMap.getId();
 	}
 
-	private static boolean isKeyed(ConnectedStreams dataStream) {
+	private static boolean isKeyed(ConnectedStreams<?, ?> dataStream) {
 		return (dataStream.getFirstInput() instanceof KeyedStream && dataStream.getSecondInput() instanceof KeyedStream);
 	}
 
@@ -634,6 +657,8 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
 	}
 
 	private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
+		private static final long serialVersionUID = 1L;
+
 		@Override
 		public Long getKey(Tuple2<Long, Long> value) throws Exception {
 			return value.f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 5e46508..2775299 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -33,15 +33,18 @@ import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.After;
@@ -59,6 +62,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("serial")
 public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
@@ -117,8 +121,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 		IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
 
-					Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(
-							0L, new Tuple2<String, Long>("", 0L));
+					Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(
+							0L, new Tuple2<>("", 0L));
 
 					@Override
 					public Tuple2<Long, Tuple2<String, Long>> map(
@@ -167,38 +171,38 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 				"peach-d\n" + "peach-d\n";
 
 		List<Tuple5<Integer, String, Character, Double, Boolean>> input = Arrays.asList(
-				new Tuple5<Integer, String, Character, Double, Boolean>(1, "apple", 'j', 0.1, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(1, "peach", 'b', 0.8, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(1, "orange", 'c', 0.7, true),
-				new Tuple5<Integer, String, Character, Double, Boolean>(2, "apple", 'd', 0.5, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(2, "peach", 'j', 0.6, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(3, "orange", 'b', 0.2, true),
-				new Tuple5<Integer, String, Character, Double, Boolean>(6, "apple", 'c', 0.1, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(7, "peach", 'd', 0.4, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(8, "orange", 'j', 0.2, true),
-				new Tuple5<Integer, String, Character, Double, Boolean>(10, "apple", 'b', 0.1, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(10, "peach", 'c', 0.5, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(11, "orange", 'd', 0.3, true),
-				new Tuple5<Integer, String, Character, Double, Boolean>(11, "apple", 'j', 0.3, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(12, "peach", 'b', 0.9, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(13, "orange", 'c', 0.7, true),
-				new Tuple5<Integer, String, Character, Double, Boolean>(15, "apple", 'd', 0.2, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(16, "peach", 'j', 0.8, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(16, "orange", 'b', 0.8, true),
-				new Tuple5<Integer, String, Character, Double, Boolean>(16, "apple", 'c', 0.1, false),
-				new Tuple5<Integer, String, Character, Double, Boolean>(17, "peach", 'd', 1.0, true));
+				new Tuple5<>(1, "apple", 'j', 0.1, false),
+				new Tuple5<>(1, "peach", 'b', 0.8, false),
+				new Tuple5<>(1, "orange", 'c', 0.7, true),
+				new Tuple5<>(2, "apple", 'd', 0.5, false),
+				new Tuple5<>(2, "peach", 'j', 0.6, false),
+				new Tuple5<>(3, "orange", 'b', 0.2, true),
+				new Tuple5<>(6, "apple", 'c', 0.1, false),
+				new Tuple5<>(7, "peach", 'd', 0.4, false),
+				new Tuple5<>(8, "orange", 'j', 0.2, true),
+				new Tuple5<>(10, "apple", 'b', 0.1, false),
+				new Tuple5<>(10, "peach", 'c', 0.5, false),
+				new Tuple5<>(11, "orange", 'd', 0.3, true),
+				new Tuple5<>(11, "apple", 'j', 0.3, false),
+				new Tuple5<>(12, "peach", 'b', 0.9, false),
+				new Tuple5<>(13, "orange", 'c', 0.7, true),
+				new Tuple5<>(15, "apple", 'd', 0.2, false),
+				new Tuple5<>(16, "peach", 'j', 0.8, false),
+				new Tuple5<>(16, "orange", 'b', 0.8, true),
+				new Tuple5<>(16, "apple", 'c', 0.1, false),
+				new Tuple5<>(17, "peach", 'd', 1.0, true));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableTimestamps();
 
 		SingleOutputStreamOperator<Tuple5<Integer, String, Character, Double, Boolean>, DataStreamSource<Tuple5<Integer, String, Character, Double, Boolean>>> sourceStream21 = env.fromCollection(input);
 		DataStream<OuterPojo> sourceStream22 = env.addSource(new PojoSource());
 
 		sourceStream21
+				.extractTimestamp(new MyTimestampExtractor())
 				.keyBy(2, 2)
-				.window(Time.of(10, new MyTimestamp(), 0))
-				.every(Time.of(4, new MyTimestamp(), 0))
+				.timeWindow(Time.of(10, TimeUnit.MILLISECONDS), Time.of(4, TimeUnit.MILLISECONDS))
 				.maxBy(3)
-				.flatten()
 				.map(new MyMapFunction2())
 				.flatMap(new MyFlatMapFunction())
 				.connect(sourceStream22)
@@ -244,11 +248,13 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		DataStream<Long> sourceStream32 = env.generateSequence(10001, 20000);
 
 		sourceStream31.filter(new PrimeFilterFunction())
-				.window(Count.of(100))
-				.max(0).flatten()
+				.windowAll(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(CountTrigger.of(100)))
+				.max(0)
 				.union(sourceStream32.filter(new PrimeFilterFunction())
-						.window(Count.of(100))
-						.max(0).flatten())
+						.windowAll(GlobalWindows.create())
+						.trigger(PurgingTrigger.of(CountTrigger.of(100)))
+						.max(0))
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		sourceStream31.flatMap(new DivisorsFlatMapFunction())
@@ -257,11 +263,13 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 			@Override
 			public Tuple2<Long, Integer> map(Long value) throws Exception {
-				return new Tuple2<Long, Integer>(value, 1);
+				return new Tuple2<>(value, 1);
 			}
 		})
 				.keyBy(0)
-				.window(Count.of(10000)).sum(1).flatten()
+				.window(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(CountTrigger.of(10_000)))
+				.sum(1)
 				.filter(new FilterFunction<Tuple2<Long, Integer>>() {
 
 					@Override
@@ -275,6 +283,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 	}
 
 	@Test
+	@Ignore
 	public void complexIntegrationTest4() throws Exception {
 		//Testing mapping and delta-policy windowing with custom class
 
@@ -290,13 +299,14 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 				"((499,587),90)\n" + "((516,606),93)\n" + "((517,609),94)\n" + "((534,628),97)\n" + "((535,631),98)";
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
 
 		env.addSource(new RectangleSource())
 				.global()
 				.map(new RectangleMapFunction())
-				.window(Delta.of(0.0, new MyDelta(), new Tuple2<Rectangle, Integer>(new Rectangle(100, 100), 0)))
-				.mapWindow(new MyWindowMapFunction())
-				.flatten()
+				.windowAll(GlobalWindows.create())
+				.trigger(PurgingTrigger.of(DeltaTrigger.of(0.0, new MyDelta())))
+				.apply(new MyWindowMapFunction())
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		env.execute();
@@ -361,6 +371,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 
 	@Test
+	@Ignore
 	public void complexIntegrationTest6() throws Exception {
 		//Testing java collections and date-time types
 
@@ -376,88 +387,89 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 		SimpleDateFormat ft = new SimpleDateFormat("dd-MM-yyyy");
 
-		ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<Tuple2<Date, HashMap<Character,
-				Integer>>>();
-		HashMap<Character, Integer> sale1 = new HashMap<Character, Integer>();
+		ArrayList<Tuple2<Date, HashMap<Character, Integer>>> sales = new ArrayList<>();
+		HashMap<Character, Integer> sale1 = new HashMap<>();
 		sale1.put('a', 2);
 		sale1.put('c', 2);
 		sale1.put('d', 1);
 		sale1.put('f', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("03-06-2014"), sale1));
+		sales.add(new Tuple2<>(ft.parse("03-06-2014"), sale1));
 
-		HashMap<Character, Integer> sale2 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale2 = new HashMap<>();
 		sale2.put('a', 1);
 		sale2.put('b', 2);
 		sale2.put('d', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("10-06-2014"), sale2));
+		sales.add(new Tuple2<>(ft.parse("10-06-2014"), sale2));
 
-		HashMap<Character, Integer> sale3 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale3 = new HashMap<>();
 		sale3.put('a', 3);
 		sale3.put('b', 1);
 		sale3.put('c', 2);
 		sale3.put('f', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("29-06-2014"), sale3));
+		sales.add(new Tuple2<>(ft.parse("29-06-2014"), sale3));
 
-		HashMap<Character, Integer> sale4 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale4 = new HashMap<>();
 		sale4.put('a', 1);
 		sale4.put('d', 1);
 		sale4.put('e', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("15-07-2014"), sale4));
+		sales.add(new Tuple2<>(ft.parse("15-07-2014"), sale4));
 
-		HashMap<Character, Integer> sale5 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale5 = new HashMap<>();
 		sale5.put('b', 2);
 		sale5.put('c', 3);
 		sale5.put('f', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("24-07-2014"), sale5));
+		sales.add(new Tuple2<>(ft.parse("24-07-2014"), sale5));
 
-		HashMap<Character, Integer> sale6 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale6 = new HashMap<>();
 		sale6.put('a', 4);
 		sale6.put('b', 2);
 		sale6.put('c', 2);
 		sale6.put('e', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("17-08-2014"), sale6));
+		sales.add(new Tuple2<>(ft.parse("17-08-2014"), sale6));
 
-		HashMap<Character, Integer> sale7 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale7 = new HashMap<>();
 		sale7.put('a', 2);
 		sale7.put('b', 2);
 		sale7.put('c', 3);
 		sale7.put('d', 1);
 		sale7.put('e', 1);
 		sale7.put('f', 2);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("27-08-2014"), sale7));
+		sales.add(new Tuple2<>(ft.parse("27-08-2014"), sale7));
 
-		HashMap<Character, Integer> sale8 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale8 = new HashMap<>();
 		sale8.put('a', 3);
 		sale8.put('b', 1);
 		sale8.put('c', 3);
 		sale8.put('d', 2);
 		sale8.put('f', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("16-09-2014"), sale8));
+		sales.add(new Tuple2<>(ft.parse("16-09-2014"), sale8));
 
-		HashMap<Character, Integer> sale9 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale9 = new HashMap<>();
 		sale9.put('a', 1);
 		sale9.put('b', 3);
 		sale9.put('c', 4);
 		sale9.put('d', 1);
 		sale9.put('e', 1);
 		sale9.put('f', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("25-09-2014"), sale9));
+		sales.add(new Tuple2<>(ft.parse("25-09-2014"), sale9));
 
-		HashMap<Character, Integer> sale10 = new HashMap<Character, Integer>();
+		HashMap<Character, Integer> sale10 = new HashMap<>();
 		sale10.put('a', 3);
 		sale10.put('b', 2);
 		sale10.put('c', 3);
 		sale10.put('d', 2);
 		sale10.put('e', 1);
 		sale10.put('f', 1);
-		sales.add(new Tuple2<Date, HashMap<Character, Integer>>(ft.parse("01-10-2014"), sale10));
+		sales.add(new Tuple2<>(ft.parse("01-10-2014"), sale10));
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableTimestamps();
 
 		DataStream<Tuple2<Date, HashMap<Character, Integer>>> sourceStream6 = env.fromCollection(sales);
-		sourceStream6.window(Time.of(1, new Timestamp6()))
-				.reduceWindow(new SalesReduceFunction())
-				.flatten()
+		sourceStream6
+				.extractTimestamp(new Timestamp6())
+				.timeWindowAll(Time.of(1, TimeUnit.MILLISECONDS))
+				.reduce(new SalesReduceFunction())
 				.flatMap(new FlatMapFunction6())
 				.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
@@ -478,7 +490,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		@Override
 		public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double,
 				Boolean> value) throws Exception {
-			return new Tuple4<Integer, String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2,
+			return new Tuple4<>(value.f0, value.f1 + "-" + value.f2,
 					value.f3, value.f4);
 		}
 
@@ -509,7 +521,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		@Override
 		public void run(SourceContext<Tuple2<Long, Tuple2<String, Long>>> ctx) throws Exception {
 			for (int i = 0; i < 20; i++) {
-				Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(1L, new Tuple2<String, Long>("a", 1L));
+				Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<>(1L, new Tuple2<>("a", 1L));
 				ctx.collect(result);
 			}
 		}
@@ -526,17 +538,28 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public Tuple2<Long, Tuple2<String, Long>> map(Tuple2<Long, Tuple2<String, Long>> value) throws Exception {
-			return new Tuple2<Long, Tuple2<String, Long>>(value.f0 + 1, value.f1);
+			return new Tuple2<>(value.f0 + 1, value.f1);
 		}
 	}
 
-	private static class MyTimestamp implements Timestamp<Tuple5<Integer, String, Character, Double, Boolean>> {
+	private static class MyTimestampExtractor implements TimestampExtractor<Tuple5<Integer, String, Character, Double, Boolean>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public long getTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value) {
+		public long extractTimestamp(Tuple5<Integer, String, Character, Double, Boolean> value, long currentTimestamp) {
 			return (long) value.f0;
 		}
+
+		@Override
+		public long emitWatermark(Tuple5<Integer, String, Character, Double, Boolean> value,
+				long currentTimestamp) {
+			return (long) value.f0 - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
 	}
 
 	private static class MyFlatMapFunction implements FlatMapFunction<Tuple4<Integer, String, Double,
@@ -573,7 +596,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public Iterable<String> select(Tuple2<Long, Tuple2<String, Long>> value) {
-			List<String> output = new ArrayList<String>();
+			List<String> output = new ArrayList<>();
 			if (value.f0 == 10) {
 				output.add("iterate");
 				output.add("firstOutput");
@@ -627,6 +650,8 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public void run(SourceContext<Rectangle> ctx) throws Exception {
+			// emit once as the initializer of the delta trigger
+			ctx.collect(rectangle);
 			for (int i = 0; i < 100; i++) {
 				ctx.collect(rectangle);
 				rectangle = rectangle.next();
@@ -644,16 +669,15 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public Tuple2<Rectangle, Integer> map(Rectangle value) throws Exception {
-			return new Tuple2<Rectangle, Integer>(value, counter++);
+			return new Tuple2<>(value, counter++);
 		}
 	}
 
-	private static class MyWindowMapFunction implements WindowMapFunction<Tuple2<Rectangle, Integer>,
-			Tuple2<Rectangle, Integer>> {
+	private static class MyWindowMapFunction implements AllWindowFunction<Tuple2<Rectangle, Integer>, Tuple2<Rectangle, Integer>, GlobalWindow> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void mapWindow(Iterable<Tuple2<Rectangle, Integer>> values, Collector<Tuple2<Rectangle,
+		public void apply(GlobalWindow window, Iterable<Tuple2<Rectangle, Integer>> values, Collector<Tuple2<Rectangle,
 				Integer>> out) throws Exception {
 			out.collect(values.iterator().next());
 		}
@@ -670,14 +694,28 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	private static class Timestamp6 implements Timestamp<Tuple2<Date, HashMap<Character, Integer>>> {
+	private static class Timestamp6 implements TimestampExtractor<Tuple2<Date, HashMap<Character, Integer>>> {
 
 		@Override
-		public long getTimestamp(Tuple2<Date, HashMap<Character, Integer>> value) {
+		public long extractTimestamp(Tuple2<Date, HashMap<Character, Integer>> value,
+				long currentTimestamp) {
 			Calendar cal = Calendar.getInstance();
 			cal.setTime(value.f0);
 			return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH);
 		}
+
+		@Override
+		public long emitWatermark(Tuple2<Date, HashMap<Character, Integer>> value,
+				long currentTimestamp) {
+			Calendar cal = Calendar.getInstance();
+			cal.setTime(value.f0);
+			return 12 * (cal.get(Calendar.YEAR)) + cal.get(Calendar.MONTH) - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return 0;
+		}
 	}
 
 	private static class SalesReduceFunction implements ReduceFunction<Tuple2<Date, HashMap<Character, Integer>>> {
@@ -697,7 +735,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 				}
 				map1.put(key, volume1 + volume2);
 			}
-			return new Tuple2<Date, HashMap<Character, Integer>>(value2.f0, map1);
+			return new Tuple2<>(value2.f0, map1);
 		}
 	}
 
@@ -710,9 +748,9 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 			Calendar cal = Calendar.getInstance();
 			cal.setTime(value.f0);
 			for (Character key : value.f1.keySet()) {
-				out.collect(new Tuple2<Integer, Tuple2<Character, Integer>>(cal.get(Calendar.MONTH)
+				out.collect(new Tuple2<>(cal.get(Calendar.MONTH)
 						+ 1,
-						new Tuple2<Character, Integer>(key, value.f1.get(key))));
+						new Tuple2<>(key, value.f1.get(key))));
 			}
 		}
 	}
@@ -722,7 +760,7 @@ public class ComplexIntegrationTest extends StreamingMultipleProgramsTestBase {
 		@Override
 		public ArrayList<Character> map(Tuple2<Date, HashMap<Character, Integer>> value)
 				throws Exception {
-			ArrayList<Character> list = new ArrayList<Character>();
+			ArrayList<Character> list = new ArrayList<>();
 			for (Character ch : value.f1.keySet()) {
 				for (int i = 0; i < value.f1.get(ch); i++) {
 					list.add(ch);

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
new file mode 100644
index 0000000..c98a659
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTupleTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.ArrayFromTuple;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ArrayFromTupleTest {
+
+	private String[] testStrings;
+
+	@Before
+	public void init() {
+		testStrings = new String[Tuple.MAX_ARITY];
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			testStrings[i] = Integer.toString(i);
+		}
+	}
+
+	@Test
+	public void testConvertFromTupleToArray() throws InstantiationException, IllegalAccessException {
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			Tuple currentTuple = (Tuple) CLASSES[i].newInstance();
+			String[] currentArray = new String[i + 1];
+			for (int j = 0; j <= i; j++) {
+				currentTuple.setField(testStrings[j], j);
+				currentArray[j] = testStrings[j];
+			}
+			arrayEqualityCheck(currentArray, new ArrayFromTuple().extract(currentTuple));
+		}
+	}
+
+	@Test
+	public void testUserSpecifiedOrder() throws InstantiationException, IllegalAccessException {
+		Tuple currentTuple = (Tuple) CLASSES[Tuple.MAX_ARITY - 1].newInstance();
+		for (int i = 0; i < Tuple.MAX_ARITY; i++) {
+			currentTuple.setField(testStrings[i], i);
+		}
+
+		String[] expected = { testStrings[5], testStrings[3], testStrings[6], testStrings[7],
+				testStrings[0] };
+		arrayEqualityCheck(expected, new ArrayFromTuple(5, 3, 6, 7, 0).extract(currentTuple));
+
+		String[] expected2 = { testStrings[0], testStrings[Tuple.MAX_ARITY - 1] };
+		arrayEqualityCheck(expected2,
+				new ArrayFromTuple(0, Tuple.MAX_ARITY - 1).extract(currentTuple));
+
+		String[] expected3 = { testStrings[Tuple.MAX_ARITY - 1], testStrings[0] };
+		arrayEqualityCheck(expected3,
+				new ArrayFromTuple(Tuple.MAX_ARITY - 1, 0).extract(currentTuple));
+
+		String[] expected4 = { testStrings[13], testStrings[4], testStrings[5], testStrings[4],
+				testStrings[2], testStrings[8], testStrings[6], testStrings[2], testStrings[8],
+				testStrings[3], testStrings[5], testStrings[2], testStrings[16], testStrings[4],
+				testStrings[3], testStrings[2], testStrings[6], testStrings[4], testStrings[7],
+				testStrings[4], testStrings[2], testStrings[8], testStrings[7], testStrings[2] };
+		arrayEqualityCheck(expected4, new ArrayFromTuple(13, 4, 5, 4, 2, 8, 6, 2, 8, 3, 5, 2, 16,
+				4, 3, 2, 6, 4, 7, 4, 2, 8, 7, 2).extract(currentTuple));
+	}
+
+	private void arrayEqualityCheck(Object[] array1, Object[] array2) {
+		assertEquals("The result arrays must have the same length", array1.length, array2.length);
+		for (int i = 0; i < array1.length; i++) {
+			assertEquals("Unequal fields at position " + i, array1[i], array2[i]);
+		}
+	}
+
+	private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class,
+			Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+			Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+			Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+			Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+			Tuple24.class, Tuple25.class };
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
new file mode 100644
index 0000000..3b098c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtractTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConcatenatedExtractTest {
+
+	private String[] testStringArray1 = { "1", "2", "3" };
+	private int[] testIntArray1 = { 1, 2, 3 };
+	private String[] testStringArray2 = { "4", "5", "6" };
+	private int[] testIntArray2 = { 4, 5, 6 };
+	private String[] testStringArray3 = { "7", "8", "9" };
+	private int[] testIntArray3 = { 7, 8, 9 };
+	private Tuple2<String[], int[]>[] testTuple2Array;
+	private Tuple2<String[], int[]> testTuple2;
+	private Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]> testData;
+
+	@SuppressWarnings("unchecked")
+	@Before
+	public void setupData() {
+		testTuple2Array = new Tuple2[2];
+		testTuple2Array[0] = new Tuple2<String[], int[]>(testStringArray1, testIntArray2);
+		testTuple2Array[1] = new Tuple2<String[], int[]>(testStringArray2, testIntArray1);
+
+		testTuple2 = new Tuple2<String[], int[]>(testStringArray3, testIntArray3);
+
+		testData = new Tuple2<Tuple2<String[], int[]>, Tuple2<String[], int[]>[]>(testTuple2,
+				testTuple2Array);
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Test
+	public void test1() {
+		Extractor ext = new ConcatenatedExtract(new FieldFromTuple(0), new FieldFromTuple(1))
+				.add(new FieldsFromArray(Integer.class, 2, 1, 0));
+		int[] expected = { testIntArray3[2], testIntArray3[1], testIntArray3[0] };
+		assertEquals(new Integer(expected[0]), ((Integer[]) ext.extract(testData))[0]);
+		assertEquals(new Integer(expected[1]), ((Integer[]) ext.extract(testData))[1]);
+		assertEquals(new Integer(expected[2]), ((Integer[]) ext.extract(testData))[2]);
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Test
+	public void test2() {
+		Extractor ext = new ConcatenatedExtract(new FieldFromTuple(1), // Tuple2<String[],int[]>[]
+				new FieldsFromArray(Tuple2.class, 1)) // Tuple2<String[],int[]>[]
+				.add(new FieldFromArray(0)) // Tuple2<String[],int[]>
+				.add(new ArrayFromTuple(0)) // Object[] (Containing String[])
+				.add(new FieldFromArray(0)) // String[]
+				.add(new FieldFromArray(1)); // String
+
+		String expected2 = testStringArray2[1];
+		assertEquals(expected2, ext.extract(testData));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
new file mode 100644
index 0000000..d274f4e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArrayTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class FieldFromArrayTest {
+
+	String[] testStringArray = { "0", "1", "2", "3", "4" };
+	Integer[] testIntegerArray = { 10, 11, 12, 13, 14 };
+	int[] testIntArray = { 20, 21, 22, 23, 24 };
+
+	@Test
+	public void testStringArray() {
+		for (int i = 0; i < this.testStringArray.length; i++) {
+			assertEquals(this.testStringArray[i],
+					new FieldFromArray<String>(i).extract(testStringArray));
+		}
+	}
+
+	@Test
+	public void testIntegerArray() {
+		for (int i = 0; i < this.testIntegerArray.length; i++) {
+			assertEquals(this.testIntegerArray[i],
+					new FieldFromArray<String>(i).extract(testIntegerArray));
+		}
+	}
+
+	@Test
+	public void testIntArray() {
+		for (int i = 0; i < this.testIntArray.length; i++) {
+			assertEquals(new Integer(this.testIntArray[i]),
+					new FieldFromArray<Integer>(i).extract(testIntArray));
+		}
+	}
+
+}