You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:43 UTC

[07/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
deleted file mode 100644
index 529850f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowSerializer.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-public final class StreamWindowSerializer<T> extends TypeSerializer<StreamWindow<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final TypeSerializer<T> typeSerializer;
-	TypeSerializer<Integer> intSerializer = IntSerializer.INSTANCE;
-	TypeSerializer<Boolean> boolSerializer = BooleanSerializer.INSTANCE;
-
-	public StreamWindowSerializer(TypeInformation<T> typeInfo, ExecutionConfig conf) {
-		Preconditions.checkNotNull(typeInfo);
-
-		this.typeSerializer = typeInfo.createSerializer(conf);
-	}
-
-	public TypeSerializer<T> getObjectSerializer() {
-		return typeSerializer;
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public StreamWindow<T> createInstance() {
-		return new StreamWindow<T>(0, 0);
-	}
-
-	@Override
-	public StreamWindow<T> copy(StreamWindow<T> from) {
-		return new StreamWindow<T>(from, typeSerializer);
-	}
-
-	@Override
-	public StreamWindow<T> copy(StreamWindow<T> from, StreamWindow<T> reuse) {
-		reuse.clear();
-		reuse.windowID = from.windowID;
-		reuse.numberOfParts = from.numberOfParts;
-		for (T element : from) {
-			reuse.add(typeSerializer.copy(element));
-		}
-		return reuse;
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(StreamWindow<T> window, DataOutputView target) throws IOException {
-
-		intSerializer.serialize(window.windowID, target);
-		intSerializer.serialize(window.numberOfParts, target);
-
-		intSerializer.serialize(window.size(), target);
-
-		for (T element : window) {
-			typeSerializer.serialize(element, target);
-		}
-	}
-
-	@Override
-	public StreamWindow<T> deserialize(DataInputView source) throws IOException {
-		return deserialize(createInstance(), source);
-	}
-
-	@Override
-	public StreamWindow<T> deserialize(StreamWindow<T> reuse, DataInputView source)
-			throws IOException {
-
-		StreamWindow<T> window = reuse;
-		window.clear();
-
-		window.windowID = intSerializer.deserialize(source);
-		window.numberOfParts = intSerializer.deserialize(source);
-
-		int size = intSerializer.deserialize(source);
-
-		for (int i = 0; i < size; i++) {
-			window.add(typeSerializer.deserialize(source));
-		}
-
-		return window;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		serialize(deserialize(source), target);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof StreamWindowSerializer) {
-			StreamWindowSerializer<?> other = (StreamWindowSerializer<?>) obj;
-
-			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof StreamWindowSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return typeSerializer.hashCode();
-	}
-
-	@Override
-	public TypeSerializer<StreamWindow<T>> duplicate() {
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
deleted file mode 100644
index 2c0a999..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindowTypeInfo.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class StreamWindowTypeInfo<T> extends TypeInformation<StreamWindow<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	final TypeInformation<T> innerType;
-
-	public StreamWindowTypeInfo(TypeInformation<T> innerType) {
-		this.innerType = Preconditions.checkNotNull(innerType);
-	}
-
-	public TypeInformation<T> getInnerType() {
-		return innerType;
-	}
-
-	@Override
-	public boolean isBasicType() {
-		return innerType.isBasicType();
-	}
-
-	@Override
-	public boolean isTupleType() {
-		return innerType.isTupleType();
-	}
-
-	@Override
-	public int getArity() {
-		return innerType.getArity();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Class<StreamWindow<T>> getTypeClass() {
-		return (Class<StreamWindow<T>>)(Object)StreamWindow.class;
-	}
-
-	@Override
-	public boolean isKeyType() {
-		return innerType.isKeyType();
-	}
-
-	@Override
-	public TypeSerializer<StreamWindow<T>> createSerializer(ExecutionConfig conf) {
-		return new StreamWindowSerializer<T>(innerType, conf);
-	}
-
-	@Override
-	public String toString() {
-		return getClass().getSimpleName() + "<" + innerType + ">";
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof StreamWindowTypeInfo) {
-			@SuppressWarnings("unchecked")
-			StreamWindowTypeInfo<T> streamWindowTypeInfo = (StreamWindowTypeInfo<T>) obj;
-
-			return streamWindowTypeInfo.canEqual(this) &&
-				innerType.equals(streamWindowTypeInfo.innerType);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return innerType.hashCode();
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof StreamWindowTypeInfo;
-	}
-
-	@Override
-	public int getTotalFields() {
-		return innerType.getTotalFields();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java
deleted file mode 100644
index 359dfb3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowEvent.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * Type representing events sent to the window buffer. The first field should
- * contain the window element the second field encodes triggers and evictions if
- * the second field is greater than 0 it represents an eviction if it equals -1
- * it represents a trigger.
- */
-public class WindowEvent<T> extends Tuple2<T, Integer> {
-	private static final long serialVersionUID = 1L;
-
-	public boolean isElement() {
-		return f1 == 0;
-	}
-
-	public boolean isEviction() {
-		return f1 > 0;
-	}
-
-	public boolean isTrigger() {
-		return f1 == -1;
-	}
-
-	public Integer getEviction() {
-		return f1;
-	}
-
-	public T getElement() {
-		return f0;
-	}
-
-	public WindowEvent<T> setElement(T element) {
-		f0 = element;
-		f1 = 0;
-		return this;
-	}
-
-	public WindowEvent<T> setTrigger() {
-		f1 = -1;
-		return this;
-	}
-
-	public WindowEvent<T> setEviction(Integer n) {
-		if (n > 0) {
-			f1 = n;
-			return this;
-		} else {
-			throw new RuntimeException("Must evict at least 1");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
deleted file mode 100644
index a899b74..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/WindowUtils.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-
-/**
- * Utility class that contains helper methods to work with stream windowing.
- */
-public class WindowUtils {
-
-	public enum WindowTransformation {
-		REDUCEWINDOW, MAPWINDOW, FOLDWINDOW, NONE;
-		private Function UDF;
-
-		public WindowTransformation with(Function UDF) {
-			this.UDF = UDF;
-			return this;
-		}
-
-		public Function getUDF() {
-			return UDF;
-		}
-	}
-
-	public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
-			int parallelism) {
-		return ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
-				|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy) || (WindowUtils
-				.isTimeOnly(trigger, eviction) && parallelism > 1));
-	}
-
-	public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		if (isTimeOnly(trigger, eviction)) {
-			long slide = getSlideSize(trigger);
-			long window = getWindowSize(eviction);
-
-			return slide < window
-					&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
-		} else {
-			return false;
-		}
-	}
-
-	public static boolean isSlidingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		if (isCountOnly(trigger, eviction)) {
-			long slide = getSlideSize(trigger);
-			long window = getWindowSize(eviction);
-
-			return slide < window
-					&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
-							.getStart()
-					&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
-		} else {
-			return false;
-		}
-	}
-
-	public static <X> TimestampWrapper<X> getTimeStampWrapper(TriggerPolicy<X> trigger) {
-		if (trigger instanceof TimeTriggerPolicy) {
-			return ((TimeTriggerPolicy<X>) trigger).getTimeStampWrapper();
-		} else {
-			throw new IllegalArgumentException(
-					"Timestamp wrapper can only be accessed for time policies");
-		}
-	}
-
-	public static <X> TimestampWrapper<X> getTimeStampWrapper(EvictionPolicy<X> eviction) {
-		if (eviction instanceof EvictionPolicy) {
-			return ((TimeEvictionPolicy<X>) eviction).getTimeStampWrapper();
-		} else {
-			throw new IllegalArgumentException(
-					"Timestamp wrapper can only be accessed for time policies");
-		}
-	}
-
-	public static long getSlideSize(TriggerPolicy<?> trigger) {
-		if (trigger instanceof TimeTriggerPolicy) {
-			return ((TimeTriggerPolicy<?>) trigger).getSlideSize();
-		} else if (trigger instanceof CountTriggerPolicy) {
-			return ((CountTriggerPolicy<?>) trigger).getSlideSize();
-		} else {
-			throw new IllegalArgumentException(
-					"Slide size can only be accessed for time or count policies");
-		}
-	}
-
-	public static long getWindowSize(EvictionPolicy<?> eviction) {
-		if (eviction instanceof TimeEvictionPolicy) {
-			return ((TimeEvictionPolicy<?>) eviction).getWindowSize();
-		} else if (eviction instanceof CountEvictionPolicy) {
-			return ((CountEvictionPolicy<?>) eviction).getWindowSize();
-		} else {
-			throw new IllegalArgumentException(
-					"Window size can only be accessed for time or count policies");
-		}
-	}
-
-	public static boolean isTumblingPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		if (eviction instanceof TumblingEvictionPolicy || eviction instanceof KeepAllEvictionPolicy) {
-			return true;
-		} else if (isTimeOnly(trigger, eviction)) {
-			long slide = getSlideSize(trigger);
-			long window = getWindowSize(eviction);
-
-			return slide == window
-					&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
-		} else if (isCountOnly(trigger, eviction)) {
-			long slide = getSlideSize(trigger);
-			long window = getWindowSize(eviction);
-
-			return slide == window
-					&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
-							.getStart()
-					&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
-		} else {
-			return false;
-		}
-	}
-
-	public static boolean isTimeOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		return trigger instanceof TimeTriggerPolicy
-				&& (eviction instanceof TimeEvictionPolicy || eviction instanceof KeepAllEvictionPolicy);
-	}
-
-	public static boolean isCountOnly(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		return trigger instanceof CountTriggerPolicy && eviction instanceof CountEvictionPolicy;
-	}
-
-	public static boolean isSystemTimeTrigger(TriggerPolicy<?> trigger) {
-		return trigger instanceof TimeTriggerPolicy
-				&& ((TimeTriggerPolicy<?>) trigger).timestampWrapper.isDefaultTimestamp();
-	}
-
-	public static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer getKey(StreamWindow<R> value) throws Exception {
-			return value.windowID;
-		}
-
-	}
-
-	public static boolean isJumpingCountPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		if (isCountOnly(trigger, eviction)) {
-			long slide = getSlideSize(trigger);
-			long window = getWindowSize(eviction);
-
-			return slide > window
-					&& ((CountTriggerPolicy<?>) trigger).getStart() == ((CountEvictionPolicy<?>) eviction)
-							.getStart()
-					&& ((CountEvictionPolicy<?>) eviction).getDeleteOnEviction() == 1;
-		} else {
-			return false;
-		}
-	}
-
-	public static boolean isJumpingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
-		if (isTimeOnly(trigger, eviction)) {
-			long slide = getSlideSize(trigger);
-			long window = getWindowSize(eviction);
-
-			return slide > window
-					&& getTimeStampWrapper(trigger).equals(getTimeStampWrapper(eviction));
-		} else {
-			return false;
-		}
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private WindowUtils() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 5004c42..5776d8d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.windowing.evictors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -40,7 +41,7 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
 	@Override
 	public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
 		int toEvict = 0;
-		long currentTime = System.currentTimeMillis();
+		long currentTime = Iterables.getLast(elements).getTimestamp();
 		long evictCutoff = currentTime - windowSize;
 		for (StreamRecord<Object> record: elements) {
 			if (record.getTimestamp() > evictCutoff) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
deleted file mode 100644
index ee878ac..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ArrayFromTuple.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Converts a Tuple to an Object-Array. The field which should be included in
- * the array can selected and reordered as needed.
- */
-public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
-
-	/**
-	 * Auto generated version id
-	 */
-	private static final long serialVersionUID = -6076121226427616818L;
-	int[] order = null;
-
-	/**
-	 * Using this constructor the extractor will convert the whole tuple (all
-	 * fields in the original order) to an array.
-	 */
-	public ArrayFromTuple() {
-		// noting to do
-	}
-
-	/**
-	 * Using this constructor the extractor will combine the fields as specified
-	 * in the indexes parameter in an object array.
-	 * 
-	 * @param indexes
-	 *            the field ids (enumerated from 0)
-	 */
-	public ArrayFromTuple(int... indexes) {
-		this.order = indexes;
-	}
-
-	@Override
-	public Object[] extract(Tuple in) {
-		Object[] output;
-
-		if (order == null) {
-			// copy the whole tuple
-			output = new Object[in.getArity()];
-			for (int i = 0; i < in.getArity(); i++) {
-				output[i] = in.getField(i);
-			}
-		} else {
-			// copy user specified order
-			output = new Object[order.length];
-			for (int i = 0; i < order.length; i++) {
-				output[i] = in.getField(order[i]);
-			}
-		}
-
-		return output;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java
deleted file mode 100644
index a220abe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/ConcatenatedExtract.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-/**
- * Combines two extractors which will be executed one after each other.
- *
- * @param <FROM>
- *            The input type of the first extractor.
- * @param <OVER>
- *            The output type of the first and the input type of the second
- *            extractor.
- * @param <TO>
- *            The output type of the second extractor and the output type of the
- *            over all extraction.
- */
-public class ConcatenatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
-
-	/**
-	 * auto-generated id
-	 */
-	private static final long serialVersionUID = -7807197760725651752L;
-
-	private Extractor<FROM, OVER> e1;
-	private Extractor<OVER, TO> e2;
-
-	/**
-	 * Combines two extractors which will be executed one after each other.
-	 * 
-	 * @param e1
-	 *            First extractor: This extractor gets applied to the input data
-	 *            first. Its output as then passed as input to the second
-	 *            extractor.
-	 * @param e2
-	 *            Second extractor: This extractor gets the output of the first
-	 *            extractor as input. Its output is then the result of the over
-	 *            all extraction.
-	 */
-	public ConcatenatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
-		this.e1 = e1;
-		this.e2 = e2;
-	}
-
-	@Override
-	public TO extract(FROM in) {
-		return e2.extract(e1.extract(in));
-	}
-
-	public <OUT> ConcatenatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
-		return new ConcatenatedExtract<FROM, TO, OUT>(this, e3);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
deleted file mode 100644
index b103ca3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/Extractor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.io.Serializable;
-
-/**
- * Extractors allow to extract/convert one type to another. They are mostly used
- * to extract some fields out of a more complex structure (Tuple/Array) to run
- * further calculation on the extraction result.
- * 
- * @param <FROM>
- *            The input data type.
- * @param <TO>
- *            The output data type.
- */
-public interface Extractor<FROM, TO> extends Serializable {
-
-	/**
-	 * Extracts/Converts the given input to an object of the output type
-	 * 
-	 * @param in
-	 *            the input data
-	 * @return the extracted/converted data
-	 */
-	public TO extract(FROM in);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
deleted file mode 100644
index 0568276..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromArray.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts a single field out of an array.
- * 
- * @param <OUT>
- *            The type of the extracted field.
- */
-public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
-
-	/**
-	 * Auto-gernated version id
-	 */
-	private static final long serialVersionUID = -5161386546695574359L;
-	private int fieldId = 0;
-
-	/**
-	 * Extracts the first field (id 0) from the array
-	 */
-	public FieldFromArray() {
-		// noting to do => will use default 0
-	}
-
-	/**
-	 * Extracts the field with the given id from the array.
-	 * 
-	 * @param fieldId
-	 *            The id of the field which will be extracted from the array.
-	 */
-	public FieldFromArray(int fieldId) {
-		this.fieldId = fieldId;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public OUT extract(Object in) {
-		return (OUT) Array.get(in, fieldId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
deleted file mode 100644
index 07b38f5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldFromTuple.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts a single field out of a tuple.
- * 
- * @param <OUT>
- *            The type of the extracted field.
- */
-public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
-
-	/**
-	 * Auto-gernated version id
-	 */
-	private static final long serialVersionUID = -5161386546695574359L;
-	private int fieldId = 0;
-
-	/**
-	 * Extracts the first field (id 0) from the tuple
-	 */
-	public FieldFromTuple() {
-		// noting to do => will use default 0
-	}
-
-	/**
-	 * Extracts the field with the given id from the tuple.
-	 * 
-	 * @param fieldId
-	 *            The id of the field which will be extracted from the tuple.
-	 */
-	public FieldFromTuple(int fieldId) {
-		this.fieldId = fieldId;
-	}
-
-	@Override
-	public OUT extract(Tuple in) {
-		return in.getField(fieldId);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
deleted file mode 100644
index 4e98689..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromArray.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import java.lang.reflect.Array;
-
-/**
- * Extracts multiple fields from an array and puts them into a new array of the
- * specified type.
- *
- * @param <OUT>
- *            The type of the output array. If out is set to String, the output
- *            of the extractor will be a String[]. If it is set to String[] the
- *            output will be String[][].
- */
-public class FieldsFromArray<OUT> implements Extractor<Object, OUT[]> {
-
-	/**
-	 * Auto-generated version id
-	 */
-	private static final long serialVersionUID = 8075055384516397670L;
-	private int[] order;
-	private Class<OUT> clazz;
-
-	/**
-	 * Extracts multiple fields from an array and puts them in the given order
-	 * into a new array of the specified type.
-	 * 
-	 * @param clazz
-	 *            the Class object representing the component type of the new
-	 *            array
-	 * @param indexes
-	 *            The indexes of the fields to be extracted. Any order is
-	 *            possible, but not more than 255 fields due to limitations in
-	 *            {@link Array#newInstance(Class, int...)}.
-	 */
-	public FieldsFromArray(Class<OUT> clazz, int... indexes) {
-		this.order = indexes;
-		this.clazz = clazz;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public OUT[] extract(Object in) {
-		OUT[] output = (OUT[]) Array.newInstance(clazz, order.length);
-		for (int i = 0; i < order.length; i++) {
-			output[i] = (OUT) Array.get(in, this.order[i]);
-		}
-		return output;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
deleted file mode 100644
index 1bfc461..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/extractor/FieldsFromTuple.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.extractor;
-
-import org.apache.flink.api.java.tuple.Tuple;
-
-/**
- * Extracts one or more fields of the type Double from a tuple and puts them
- * into a new double[]
- */
-public class FieldsFromTuple implements Extractor<Tuple, double[]> {
-
-	/**
-	 * auto generated version id
-	 */
-	private static final long serialVersionUID = -2554079091050273761L;
-	int[] indexes;
-
-	/**
-	 * Extracts one or more fields of the the type Double from a tuple and puts
-	 * them into a new double[] (in the specified order).
-	 * 
-	 * @param indexes
-	 *            The indexes of the fields to be extracted.
-	 */
-	public FieldsFromTuple(int... indexes) {
-		this.indexes = indexes;
-	}
-
-	@Override
-	public double[] extract(Tuple in) {
-		double[] out = new double[indexes.length];
-		for (int i = 0; i < indexes.length; i++) {
-			out[i] = (Double) in.getField(indexes[i]);
-		}
-		return out;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
deleted file mode 100644
index 3266a24..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * Represents a count based trigger or eviction policy. Use the
- * {@link Count#of(int)} to get an instance.
- */
-@SuppressWarnings("rawtypes")
-public class Count extends WindowingHelper {
-
-	private int count;
-	private int deleteOnEviction = 1;
-	private int startValue = CountTriggerPolicy.DEFAULT_START_VALUE;
-
-	/**
-	 * Specifies on which element a trigger or an eviction should happen (based
-	 * on the count of the elements).
-	 * 
-	 * This constructor does exactly the same as {@link Count#of(int)}.
-	 * 
-	 * @param count
-	 *            the number of elements to count before trigger/evict
-	 */
-	public Count(int count) {
-		this.count = count;
-	}
-
-	@Override
-	public EvictionPolicy<?> toEvict() {
-		return new CountEvictionPolicy(count, deleteOnEviction);
-	}
-
-	@Override
-	public TriggerPolicy<?> toTrigger() {
-		return new CountTriggerPolicy(count, startValue);
-	}
-
-	/**
-	 * Sets the number of elements deleted at each eviction (i.e when the number
-	 * elements exceeds the window size). By default the elements get deleted
-	 * one by one (deleteOnEvition = 1)
-	 * 
-	 * @param deleteOnEviction
-	 *            The number of elements deleted at each evition
-	 * @return Helper representing the count based policy
-	 * 
-	 */
-	public Count withDelete(int deleteOnEviction) {
-		this.deleteOnEviction = deleteOnEviction;
-		return this;
-	}
-
-	/**
-	 * Sets the initial value of the counter. 0 by default
-	 * 
-	 * @param startValue
-	 *            Starting value of the window counter
-	 * @return Helper representing the count based policy
-	 * 
-	 */
-	public Count startingAt(int startValue) {
-		this.startValue = startValue;
-		return this;
-	}
-
-	/**
-	 * Specifies a count based eviction (window size) or trigger policy (slide
-	 * size). For eviction 'count' defines the number of elements in each
-	 * window. For trigger 'count' defines how often do we call the user
-	 * function in terms of number of elements received.
-	 * 
-	 * @param count
-	 *            the number of elements to count before trigger/evict
-	 * @return Helper representing the count based policy
-	 */
-	public static Count of(int count) {
-		return new Count(count);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
deleted file mode 100644
index 31063ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This helper represents a trigger or eviction policy based on a
- * {@link DeltaFunction}.
- * 
- * @param <DATA>
- *            the data type handled by the delta function represented by this
- *            helper.
- */
-public class Delta<DATA> extends WindowingHelper<DATA> {
-
-	private DeltaFunction<DATA> deltaFunction;
-	private DATA initVal;
-	private double threshold;
-	private TypeSerializer<DATA> typeSerializer;
-
-	/**
-	 * Creates a delta helper representing a delta count or eviction policy
-	 * @param deltaFunction
-	 *				The delta function which should be used to calculate the delta
-	 *				points.
-	 * @param initVal
-	 *				The initial value which will be used to calculate the first
-	 *				delta.
-	 * @param threshold
-	 * 				The threshold used by the delta function.
-	 */
-	public Delta(DeltaFunction<DATA> deltaFunction, DATA initVal, double threshold) {
-		this.deltaFunction = deltaFunction;
-		this.initVal = initVal;
-		this.threshold = threshold;
-	}
-
-	@Override
-	public EvictionPolicy<DATA> toEvict() {
-		instantiateTypeSerializer();
-		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
-	}
-
-	@Override
-	public TriggerPolicy<DATA> toTrigger() {
-		instantiateTypeSerializer();
-		return new DeltaPolicy<DATA>(deltaFunction, initVal, threshold, typeSerializer);
-	}
-
-	/**
-	 * Creates a delta helper representing a delta trigger or eviction policy.
-	 * </br></br> This policy calculates a delta between the data point which
-	 * triggered last and the currently arrived data point. It triggers if the
-	 * delta is higher than a specified threshold. </br></br> In case it gets
-	 * used for eviction, this policy starts from the first element of the
-	 * buffer and removes all elements from the buffer which have a higher delta
-	 * then the threshold. As soon as there is an element with a lower delta,
-	 * the eviction stops.
-	 *
-	 * @param deltaFunction
-	 *				The delta function which should be used to calculate the delta
-	 *				points.
-	 * @param initVal
-	 *				The initial value which will be used to calculate the first
-	 *				delta.
-	 * @param threshold
-	 * 				The threshold used by the delta function.
-	 * @return Helper representing a delta trigger or eviction policy
-	 */
-	public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
-			DATA initVal) {
-		return new Delta<DATA>(deltaFunction, initVal, threshold);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void instantiateTypeSerializer(){
-		if (executionConfig == null){
-			throw new UnsupportedOperationException("ExecutionConfig has to be set to instantiate TypeSerializer.");
-		}
-		TypeInformation typeInformation = TypeExtractor.getForObject(initVal);
-		typeSerializer = typeInformation.createSerializer(executionConfig);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
deleted file mode 100644
index 7773d9a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/FullStream.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * Window that represents the full stream history. Can be used only as eviction
- * policy and only with operations that support pre-aggregator such as reduce or
- * aggregations.
- */
-public class FullStream<DATA> extends WindowingHelper<DATA> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	private FullStream() {
-	}
-
-	@Override
-	public EvictionPolicy<DATA> toEvict() {
-		return new KeepAllEvictionPolicy<DATA>();
-	}
-
-	@Override
-	public TriggerPolicy<DATA> toTrigger() {
-		throw new RuntimeException(
-				"Full stream policy can be only used as eviction. Use .every(..) after the window call.");
-	}
-
-	/**
-	 * Returns a helper representing an eviction that keeps all previous record
-	 * history.
-	 */
-	public static <R> FullStream<R> window() {
-		return new FullStream<R>();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
deleted file mode 100644
index 8581ac5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-/**
- * {@link Timestamp} implementation to be used when system time is needed to
- * determine windows
- */
-public class SystemTimestamp<T> implements Timestamp<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public long getTimestamp(T value) {
-		return System.currentTimeMillis();
-	}
-
-	public static <R> TimestampWrapper<R> getWrapper() {
-		return new TimestampWrapper<R>(new SystemTimestamp<R>(), System.currentTimeMillis());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
deleted file mode 100644
index 022f975..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * This helper represents a time based count or eviction policy. By default the
- * time is measured with {@link System#currentTimeMillis()} in
- * {@link SystemTimestamp}.
- * 
- * @param <DATA>
- *            The data type which is handled by the time stamp used in the
- *            policy represented by this helper
- */
-public class Time<DATA> extends WindowingHelper<DATA> {
-
-	protected long length;
-	protected TimeUnit granularity;
-	protected TimestampWrapper<DATA> timestampWrapper;
-	protected long delay;
-
-	/**
-	 * Creates a helper representing a trigger which triggers every given
-	 * length or an eviction which evicts all elements older than length.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timeUnit
-	 *            The unit of time such as minute oder millisecond. Note that
-	 *            the smallest possible granularity is milliseconds. Any smaller
-	 *            time unit might cause an error at runtime due to conversion
-	 *            problems.
-	 * @param timestamp
-	 *            The user defined timestamp that will be used to extract time
-	 *            information from the incoming elements
-	 * @param startTime
-	 *            The startTime of the stream for computing the first window
-	 */
-	private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long startTime) {
-		this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, startTime));
-	}
-
-	/**
-	 * Creates a helper representing a trigger which triggers every given
-	 * length or an eviction which evicts all elements older than length.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timeUnit
-	 *            The unit of time such as minute oder millisecond. Note that
-	 *            the smallest possible granularity is milliseconds. Any smaller
-	 *            time unit might cause an error at runtime due to conversion
-	 *            problems.
-	 * @param timestampWrapper
-	 *            The user defined {@link TimestampWrapper} that will be used to
-	 *            extract time information from the incoming elements
-	 */
-	private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWrapper) {
-		this.length = length;
-		this.granularity = timeUnit;
-		this.timestampWrapper = timestampWrapper;
-	}
-
-	@Override
-	public EvictionPolicy<DATA> toEvict() {
-		return new TimeEvictionPolicy<DATA>(granularityInMillis(), timestampWrapper);
-	}
-
-	@Override
-	public TriggerPolicy<DATA> toTrigger() {
-		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper);
-	}
-
-	/**
-	 * Creates a helper representing a time trigger which triggers every given
-	 * length (slide size) or a time eviction which evicts all elements older
-	 * than length (window size) using System time.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timeUnit
-	 *            The unit of time such as minute oder millisecond. Note that
-	 *            the smallest possible granularity is milliseconds. Any smaller
-	 *            time unit might cause an error at runtime due to conversion
-	 *            problems.
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	@SuppressWarnings("unchecked")
-	public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
-		return new Time<DATA>(length, timeUnit,
-				(TimestampWrapper<DATA>) SystemTimestamp.getWrapper());
-	}
-
-	/**
-	 * Creates a helper representing a time trigger which triggers every given
-	 * length (slide size) or a time eviction which evicts all elements older
-	 * than length (window size) using a user defined timestamp extractor.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timestamp
-	 *            The user defined timestamp that will be used to extract time
-	 *            information from the incoming elements
-	 * @param startTime
-	 *            The startTime used to compute the first window
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long startTime) {
-		return new Time<DATA>(length, null, timestamp, startTime);
-	}
-
-	/**
-	 * Creates a helper representing a time trigger which triggers every given
-	 * length (slide size) or a time eviction which evicts all elements older
-	 * than length (window size) using a user defined timestamp extractor. By
-	 * default the start time is set to 0.
-	 * 
-	 * @param length
-	 *            The number of time units
-	 * @param timestamp
-	 *            The user defined timestamp that will be used to extract time
-	 *            information from the incoming elements
-	 * @return Helper representing the time based trigger and eviction policy
-	 */
-	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
-		return of(length, timestamp, 0);
-	}
-	
-	protected long granularityInMillis() {
-		return granularity == null ? length : granularity.toMillis(length);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
deleted file mode 100644
index fea6020..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-/**
- * Interface for getting a timestamp from a custom value. Used in window
- * reduces. In order to work properly, the timestamps must be non-decreasing.
- *
- * @param <T>
- *            Type of the value to create the timestamp from.
- */
-public interface Timestamp<T> extends Serializable {
-
-	/**
-	 * Values
-	 * 
-	 * @param value
-	 *            The value to create the timestamp from
-	 * @return The timestamp
-	 */
-	public long getTimestamp(T value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
deleted file mode 100644
index c2ec7c2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import java.io.Serializable;
-
-public class TimestampWrapper<T> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private long startTime;
-	private Timestamp<T> timestamp;
-
-	public TimestampWrapper(Timestamp<T> timeStamp, long startTime) {
-		this.timestamp = timeStamp;
-		this.startTime = startTime;
-	}
-
-	public long getTimestamp(T in) {
-		return timestamp.getTimestamp(in);
-	}
-
-	public long getStartTime() {
-		return startTime;
-	}
-
-	public boolean isDefaultTimestamp() {
-		return timestamp instanceof SystemTimestamp;
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof TimestampWrapper)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				TimestampWrapper<T> otherTSW = (TimestampWrapper<T>) other;
-				if (timestamp instanceof SystemTimestamp
-						&& otherTSW.timestamp instanceof SystemTimestamp) {
-					return true;
-				} else {
-					return startTime == otherTSW.startTime
-							&& timestamp.getClass() == otherTSW.timestamp.getClass();
-				}
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
deleted file mode 100644
index 17e142a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/WindowingHelper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.helper;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-
-/**
- * A helper representing a count or eviction policy. Such helper classes are
- * used to provide a nice and well readable API.
- * 
- * @param <DATA>
- *            the type of input data handled by this helper
- * @see Count
- * @see Time
- * @see Delta
- */
-public abstract class WindowingHelper<DATA> {
-
-	/**
-	 * Provides information for initial value serialization
-	 * in {@link Delta}, unused in other subclasses.
-	 */
-	protected ExecutionConfig executionConfig;
-
-	/**
-	 * Method for encapsulating the {@link EvictionPolicy}.
-	 * @return the eviction policy
-	 */
-	public abstract EvictionPolicy<DATA> toEvict();
-
-	/**
-	 * Method for encapsulating the {@link TriggerPolicy}.
-	 * @return the trigger policy
-	 */
-	public abstract TriggerPolicy<DATA> toTrigger();
-
-	/**
-	 * Setter for the {@link ExecutionConfig} field.
-	 * @param executionConfig Desired value
-	 */
-	public final void setExecutionConfig(ExecutionConfig executionConfig){
-		this.executionConfig = executionConfig;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
deleted file mode 100644
index 29ba9eb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveCloneableEvictionPolicyWrapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * The {@link ActiveEvictionPolicy} wraps around a non active
- * {@link EvictionPolicy}. It forwards all calls to
- * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
- * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
- * triggered parameter will be set to true.
- * 
- * This class additionally implements the clone method and can wrap around
- * {@link CloneableEvictionPolicy} to make it active.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public class ActiveCloneableEvictionPolicyWrapper<DATA> extends ActiveEvictionPolicyWrapper<DATA>
-		implements CloneableEvictionPolicy<DATA> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = 1520261575300622769L;
-	CloneableEvictionPolicy<DATA> nestedPolicy;
-
-	/**
-	 * Creates a wrapper which activates the eviction policy which is wrapped
-	 * in. This means that the nested policy will get called on fake elements as
-	 * well as on real elements.
-	 * 
-	 * This specialized version of the {@link ActiveEvictionPolicyWrapper} works
-	 * with {@link CloneableEvictionPolicy} and is thereby cloneable as well.
-	 * 
-	 * @param nestedPolicy
-	 *            The policy which should be activated/wrapped in.
-	 */
-	public ActiveCloneableEvictionPolicyWrapper(CloneableEvictionPolicy<DATA> nestedPolicy) {
-		super(nestedPolicy);
-		this.nestedPolicy = nestedPolicy;
-	}
-
-	@Override
-	public ActiveCloneableEvictionPolicyWrapper<DATA> clone() {
-		return new ActiveCloneableEvictionPolicyWrapper<DATA>(nestedPolicy.clone());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
deleted file mode 100644
index fe172bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicy.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This interface is used for active eviction policies. beside the functionality
- * inherited from {@link EvictionPolicy} it provides a method which gets called
- * to notify on fake elements.
- * 
- * In case an eviction policy implements this interface instead of the
- * {@link EvictionPolicy} interface, not only the real but also fake data points
- * will cause a notification of the eviction.
- * 
- * Fake data points are mostly used in windowing based on time to trigger and
- * evict even if no element arrives at all during a windows duration.
- */
-public interface ActiveEvictionPolicy<DATA> extends EvictionPolicy<DATA> {
-
-	/**
-	 * Proves if and how many elements should be deleted from the element
-	 * buffer. The eviction takes place after the trigger and after the call to
-	 * the UDF. This method is only called with fake elements.
-	 * 
-	 * Note: Fake elements are always considered as triggered. Therefore this
-	 * method does not have a triggered parameter.
-	 * 
-	 * @param datapoint
-	 *            the current fake data point
-	 * @param bufferSize
-	 *            the current size of the buffer (only real elements are
-	 *            counted)
-	 * @return the number of elements to delete from the buffer (only real
-	 *         elements are counted)
-	 */
-	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
deleted file mode 100644
index b3b6935..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveEvictionPolicyWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This {@link ActiveEvictionPolicy} wraps around a non active
- * {@link EvictionPolicy}. It forwards all calls to
- * {@link ActiveEvictionPolicy#notifyEvictionWithFakeElement(Object, int)} to
- * {@link EvictionPolicy#notifyEviction(Object, boolean, int)} while the
- * triggered parameter will be set to true.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public class ActiveEvictionPolicyWrapper<DATA> implements ActiveEvictionPolicy<DATA> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = -7656558669799505882L;
-	private EvictionPolicy<DATA> nestedPolicy;
-
-	/**
-	 * Creates a wrapper which activates the eviction policy which is wrapped
-	 * in. This means that the nested policy will get called on fake elements as
-	 * well as on real elements.
-	 * 
-	 * @param nestedPolicy
-	 *            The policy which should be activated/wrapped in.
-	 */
-	public ActiveEvictionPolicyWrapper(EvictionPolicy<DATA> nestedPolicy) {
-		if (nestedPolicy == null) {
-			throw new RuntimeException("The nested policy must not be null.");
-		}
-		this.nestedPolicy = nestedPolicy;
-	}
-
-	@Override
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-		return nestedPolicy.notifyEviction(datapoint, triggered, bufferSize);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
-		return nestedPolicy.notifyEviction((DATA) datapoint, true, bufferSize);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
deleted file mode 100644
index c44be37..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerCallback.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * In case an {@link ActiveTriggerPolicy} is used, it can implement own
- * {@link Runnable} classes. Such {@link Runnable} classes will be executed as
- * an own thread and can submit fake elements, to the element buffer at any
- * time.
- * 
- * The factory method for runnables of the {@link ActiveTriggerPolicy} gets an
- * instance of this interface as parameter. The describes adding of elements can
- * be done by the runnable using the methods provided in this interface.
- * 
- */
-public interface ActiveTriggerCallback {
-
-	/**
-	 * Submits a new fake data point to the element buffer. Such a fake element
-	 * might be used to trigger at any time, but will never be included in the
-	 * result of the reduce function. The submission of a fake element causes
-	 * notifications only at the {@link ActiveTriggerPolicy} and
-	 * {@link ActiveEvictionPolicy} implementations.
-	 * 
-	 * @param datapoint
-	 *            the fake data point to be added
-	 */
-	public void sendFakeElement(Object datapoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
deleted file mode 100644
index b645c0f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-
-/**
- * This interface extends the {@link TriggerPolicy} interface with functionality
- * for active triggers. Active triggers can act in two ways:
- * 
- * 1) Whenever an element arrives at the operator, the
- * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
- * first. It can return zero ore more fake data points which will be added
- * before the currently arrived real element gets processed. This allows to
- * handle empty windows in time based windowing with an user defined
- * {@link Timestamp}. Triggers are not called on fake datapoint. A fake
- * datapoint is always considered as triggered.
- * 
- * 2) An active trigger has a factory method for a runnable. This factory method
- * gets called at the start up of the operator. The returned runnable will be
- * executed in its own thread and can submit fake elements at any time through an
- * {@link ActiveTriggerCallback}. This allows to have time based triggers based
- * on any system internal time measure. Triggers are not called on fake
- * datapoint. A fake datapoints is always considered as triggered.
- * 
- * @param <DATA>
- *            The data type which can be handled by this policy
- */
-public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
-
-	/**
-	 * Whenever an element arrives at the operator, the
-	 * {@link ActiveTriggerPolicy#preNotifyTrigger(Object)} method gets called
-	 * first. It can return zero ore more fake data points which will be added
-	 * before the the currently arrived real element gets processed. This allows
-	 * to handle empty windows in time based windowing with an user defined
-	 * {@link Timestamp}. Triggers are not called on fake datapoints. A fake
-	 * datapoint is always considered as triggered.
-	 * 
-	 * @param datapoint
-	 *            the data point which arrived at the operator
-	 * @return zero ore more fake data points which will be added before the the
-	 *         currently arrived real element gets processed.
-	 */
-	public Object[] preNotifyTrigger(DATA datapoint);
-
-	/**
-	 * This is the factory method for a runnable. This factory method gets
-	 * called at the start up of the operator. The returned runnable will be
-	 * executed in its own thread and can submit fake elements at any time through
-	 * an {@link ActiveTriggerCallback}. This allows to have time based triggers
-	 * based on any system internal time measure. Triggers are not called on
-	 * fake datapoints. A fake datapoint is always considered as triggered.
-	 * 
-	 * @param callback
-	 *            A callback object which allows to add fake elements from
-	 *            within the returned {@link Runnable}.
-	 * @return The runnable implementation or null in case there is no. In case
-	 *         an {@link ActiveTriggerPolicy} is used, it can implement own
-	 *         {@link Runnable} classes. Such {@link Runnable} classes will be
-	 *         executed as an own thread and can submit fake elements, to the
-	 *         element buffer at any time.
-	 */
-	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java
deleted file mode 100644
index 308f152..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CentralActiveTrigger.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * Interface for defining grouped windowing policies which can interact with
- * other groups to trigger on the latest available information globally
- * available to all groups.</p> At predefined time intervals the discretizers
- * takes the last globally seen element, and notifies all groups (but the one
- * that already have seen the object). This allows to trigger before an element
- * comes from the next window for a specific group. This pattern can be
- * used for instance in time policies to regularly broadcast the current time to
- * all groups.
- */
-public interface CentralActiveTrigger<DATA> extends CloneableTriggerPolicy<DATA> {
-
-	/**
-	 * This method is called to broadcast information about the last globally
-	 * seen data point to all triggers. The number of elements returned in the
-	 * array will determine the number of triggers at that point, while the
-	 * elements themselves are used only for active eviction.
-	 * 
-	 * @param datapoint
-	 *            The last globally seen data
-	 * @return An object of fake elements. If returned null or empty list, no
-	 *         triggers will occur.
-	 */
-	public Object[] notifyOnLastGlobalElement(DATA datapoint);
-
-}