You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:42 UTC

[06/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
deleted file mode 100644
index 6bc5072..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableEvictionPolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-
-/**
- * When used in grouped windowing, eviction policies must
- * provide a clone method. Eviction policies get cloned to provide an own
- * instance for each group and respectively each individual element buffer as
- * groups maintain their own buffers with the elements belonging to the
- * respective group.
- * 
- * This interface extends {@link EvictionPolicy} with such a clone method. It
- * also adds the Java {@link Cloneable} interface as flag.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public interface CloneableEvictionPolicy<DATA> extends EvictionPolicy<DATA>, Cloneable {
-
-	/**
-	 * This method should return an exact copy of the object it belongs to
-	 * including the current object state.
-	 * 
-	 * @return a copy of this object
-	 */
-	public CloneableEvictionPolicy<DATA> clone();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
deleted file mode 100644
index 5adddc4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiEvictionPolicy.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-/**
- * This policy does the same as {@link MultiEvictionPolicy}. Additionally it is
- * cloneable and only cloneable policies can be passed to the constructor.
- * 
- * @param <DATA>
- *            The type of data handled by this policy
- */
-public class CloneableMultiEvictionPolicy<DATA> extends MultiEvictionPolicy<DATA> implements
-		CloneableEvictionPolicy<DATA> {
-
-	/**
-	 * Default version id.
-	 */
-	private static final long serialVersionUID = 1L;
-
-	private CloneableEvictionPolicy<DATA>[] allPolicies;
-	private EvictionStrategy strategy;
-
-	/**
-	 * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
-	 * is cloneable and only cloneable policies can be passed to the
-	 * constructor.
-	 * 
-	 * When using this constructor the MAX strategy is used by default. You can
-	 * select other strategies using
-	 * {@link CloneableMultiEvictionPolicy#CloneableMultiEvictionPolicy(EvictionStrategy, CloneableEvictionPolicy...)}
-	 * .
-	 * 
-	 * @param evictionPolicies
-	 *            some cloneable policies to be tied together.
-	 */
-	public CloneableMultiEvictionPolicy(CloneableEvictionPolicy<DATA>... evictionPolicies) {
-		this(EvictionStrategy.MAX, evictionPolicies);
-	}
-
-	/**
-	 * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
-	 * is cloneable and only cloneable policies can be passed to the
-	 * constructor.
-	 * 
-	 * @param strategy
-	 *            the strategy to be used. See {@link MultiEvictionPolicy.EvictionStrategy} for a
-	 *            list of possible options.
-	 * @param evictionPolicies
-	 *            some cloneable policies to be tied together.
-	 */
-	public CloneableMultiEvictionPolicy(EvictionStrategy strategy,
-			CloneableEvictionPolicy<DATA>... evictionPolicies) {
-		super(strategy, evictionPolicies);
-		this.allPolicies = evictionPolicies;
-		this.strategy = strategy;
-	}
-
-	@SuppressWarnings("unchecked")
-	public CloneableEvictionPolicy<DATA> clone() {
-		LinkedList<CloneableEvictionPolicy<DATA>> clonedPolicies = new LinkedList<CloneableEvictionPolicy<DATA>>();
-		for (int i = 0; i < allPolicies.length; i++) {
-			clonedPolicies.add(allPolicies[i].clone());
-		}
-		return new CloneableMultiEvictionPolicy<DATA>(strategy,
-				clonedPolicies.toArray(new CloneableEvictionPolicy[allPolicies.length]));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
deleted file mode 100644
index aaecefb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableMultiTriggerPolicy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-/**
- * This policy does the same as {@link MultiTriggerPolicy}. Additionally it is
- * cloneable and only cloneable policies can be passed to the constructor.
- * 
- * @param <DATA>
- *            The type of data handled by this policy
- */
-public class CloneableMultiTriggerPolicy<DATA> extends MultiTriggerPolicy<DATA> implements
-		CloneableTriggerPolicy<DATA>, Cloneable {
-
-	/**
-	 * Default version id.
-	 */
-	private static final long serialVersionUID = 1L;
-
-	private CloneableTriggerPolicy<DATA>[] allPolicies;
-
-	/**
-	 * This policy does the same as {@link MultiTriggerPolicy}. Additionally it
-	 * is cloneable and only cloneable policies can be passed to the
-	 * constructor.
-	 * 
-	 * @param policies
-	 *            some cloneable policies to be tied together.
-	 */
-	public CloneableMultiTriggerPolicy(CloneableTriggerPolicy<DATA>... policies) {
-		super(policies);
-		this.allPolicies = policies;
-	}
-
-	@SuppressWarnings("unchecked")
-	public CloneableTriggerPolicy<DATA> clone() {
-		LinkedList<CloneableTriggerPolicy<DATA>> clonedPolicies = new LinkedList<CloneableTriggerPolicy<DATA>>();
-		for (int i = 0; i < allPolicies.length; i++) {
-			clonedPolicies.add(allPolicies[i].clone());
-		}
-		return new CloneableMultiTriggerPolicy<DATA>(
-				clonedPolicies.toArray(new CloneableTriggerPolicy[allPolicies.length]));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
deleted file mode 100644
index 3f55f41..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CloneableTriggerPolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-
-/**
- * When used in grouped windowing, trigger policies can provide
- * a clone method. Cloneable triggers can be used in a distributed manner,
- * which means they get cloned to provide an own instance for each group. This
- * allows each group to trigger individually and only based on the elements
- * belonging to the respective group.
- * 
- * This interface extends {@link TriggerPolicy} with such a clone method. It
- * also adds the Java {@link Cloneable} interface as flag.
- * 
- * @param <DATA>
- *            The data type handled by this policy
- */
-public interface CloneableTriggerPolicy<DATA> extends TriggerPolicy<DATA>, Cloneable {
-
-	/**
-	 * This method should return an exact copy of the object it belongs to
-	 * including the current object state.
-	 * 
-	 * @return a copy of this object
-	 */
-	public CloneableTriggerPolicy<DATA> clone();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
deleted file mode 100644
index 9be25d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This eviction policy allows the eviction of data points from the buffer using
- * a counter of arriving elements and a threshold (maximal buffer size)
- * 
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it count even fake elements.
- * 
- * @param <IN>
- *            the type of the incoming data points
- */
-public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
-
-	/**
-	 * Auto generated version id
-	 */
-	private static final long serialVersionUID = 2319201348806427996L;
-
-	private int maxElements;
-	private int counter;
-	private int deleteOnEviction = 1;
-	private int startValue;
-
-	/**
-	 * This constructor allows the setup of the simplest possible count based
-	 * eviction. It keeps the size of the buffer according to the given
-	 * maxElements parameter by deleting the oldest element in the buffer.
-	 * Eviction only takes place if the counter of arriving elements would be
-	 * higher than maxElements without eviction.
-	 * 
-	 * @param maxElements
-	 *            The maximum number of elements before eviction. As soon as one
-	 *            more element arrives, the oldest element will be deleted
-	 */
-	public CountEvictionPolicy(int maxElements) {
-		this(maxElements, 1);
-	}
-
-	/**
-	 * This constructor allows to set up both, the maximum number of elements
-	 * and the number of elements to be deleted in case of an eviction.
-	 * 
-	 * Eviction only takes place if the counter of arriving elements would be
-	 * higher than maxElements without eviction. In such a case deleteOnEviction
-	 * elements will be removed from the buffer.
-	 * 
-	 * The counter of arriving elements is adjusted respectively, but never set
-	 * below zero:
-	 * counter=(counter-deleteOnEviction<0)?0:counter-deleteOnEviction
-	 * 
-	 * @param maxElements
-	 *            maxElements The maximum number of elements before eviction.
-	 * @param deleteOnEviction
-	 *            The number of elements to be deleted on eviction. The counter
-	 *            will be adjusted respectively but never below zero.
-	 */
-	public CountEvictionPolicy(int maxElements, int deleteOnEviction) {
-		this(maxElements, deleteOnEviction, 0);
-	}
-
-	/**
-	 * The same as {@link CountEvictionPolicy#CountEvictionPolicy(int, int)}.
-	 * Additionally a custom start value for the counter of arriving elements
-	 * can be set. By setting a negative start value the first eviction can be
-	 * delayed.
-	 * 
-	 * @param maxElements
-	 *            maxElements The maximum number of elements before eviction.
-	 * @param deleteOnEviction
-	 *            The number of elements to be deleted on eviction. The counter
-	 *            will be adjusted respectively but never below zero.
-	 * @param startValue
-	 *            A custom start value for the counter of arriving elements.
-	 * @see CountEvictionPolicy#CountEvictionPolicy(int, int)
-	 */
-	public CountEvictionPolicy(int maxElements, int deleteOnEviction, int startValue) {
-		this.counter = startValue;
-		this.deleteOnEviction = deleteOnEviction;
-		this.maxElements = maxElements;
-		this.startValue = startValue;
-	}
-
-	@Override
-	public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
-		// The comparison have to be >= and not == to cover case max=0
-		if (counter >= maxElements) {
-			// Adjust the counter according to the current eviction
-			counter = (counter - deleteOnEviction < 0) ? 0 : counter - deleteOnEviction;
-			// The current element will be added after the eviction
-			// Therefore, increase counter in any case
-			counter++;
-			return deleteOnEviction;
-		} else {
-			counter++;
-			return 0;
-		}
-	}
-
-	@Override
-	public CountEvictionPolicy<IN> clone() {
-		return new CountEvictionPolicy<IN>(maxElements, deleteOnEviction, counter);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof CountEvictionPolicy)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				CountEvictionPolicy<IN> otherPolicy = (CountEvictionPolicy<IN>) other;
-				return startValue == otherPolicy.startValue
-						&& deleteOnEviction == otherPolicy.deleteOnEviction
-						&& maxElements == otherPolicy.maxElements;
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	public int getWindowSize() {
-		return maxElements;
-	}
-
-	public int getStart() {
-		return startValue;
-	}
-	
-	public int getDeleteOnEviction(){
-		return deleteOnEviction;
-	}
-
-	@Override
-	public String toString() {
-		return "CountPolicy(" + maxElements + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
deleted file mode 100644
index 9bd6f82..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This policy triggers at every n'th element.
- * 
- * @param <IN>
- *            The type of the data points which are handled by this policy
- */
-public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = -6357200688886103968L;
-
-	public static final int DEFAULT_START_VALUE = 0;
-
-	private int counter;
-	private int max;
-	private int startValue;
-
-	/**
-	 * This constructor will set up a count based trigger, which triggers after
-	 * max elements have arrived.
-	 * 
-	 * @param max
-	 *            The number of arriving elements before the trigger occurs.
-	 */
-	public CountTriggerPolicy(int max) {
-		this(max, DEFAULT_START_VALUE);
-	}
-
-	/**
-	 * In addition to {@link CountTriggerPolicy#CountTriggerPolicy(int)} this
-	 * constructor allows to set a custom start value for the element counter.
-	 * This can be used to delay the first trigger by setting a negative start
-	 * value. Often the first trigger should be delayed in case of sliding
-	 * windows. For example if the size of a window should be 4 and a trigger
-	 * should happen every 2, a start value of -2 would allow to also have the
-	 * first window of size 4.
-	 * 
-	 * @param max
-	 *            The number of arriving elements before the trigger occurs.
-	 * @param startValue
-	 *            The start value for the counter of arriving elements.
-	 * @see CountTriggerPolicy#CountTriggerPolicy(int)
-	 */
-	public CountTriggerPolicy(int max, int startValue) {
-		this.max = max;
-		this.counter = startValue;
-		this.startValue = startValue;
-	}
-
-	@Override
-	public boolean notifyTrigger(IN datapoint) {
-		// The comparison have to be >= and not == to cover case max=0
-		if (counter >= max) {
-			// The current data point will be part of the next window!
-			// Therefore the counter needs to be set to one already.
-			counter = 1;
-			return true;
-		} else {
-			counter++;
-			return false;
-		}
-	}
-
-	@Override
-	public CountTriggerPolicy<IN> clone() {
-		return new CountTriggerPolicy<IN>(max, counter);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof CountTriggerPolicy)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				CountTriggerPolicy<IN> otherPolicy = (CountTriggerPolicy<IN>) other;
-				return max == otherPolicy.max && startValue == otherPolicy.startValue;
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	public int getSlideSize() {
-		return max;
-	}
-	
-	public int getStart() {
-		return startValue;
-	}
-
-	@Override
-	public String toString() {
-		return "CountPolicy(" + max + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
deleted file mode 100644
index 0b6a493..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-
-/**
- * This policy calculates a delta between the data point which triggered last
- * and the currently arrived data point. It triggers if the delta is higher than
- * a specified threshold.
- * 
- * In case it gets used for eviction, this policy starts from the first element
- * of the buffer and removes all elements from the buffer which have a higher
- * delta then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
- * 
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it calculate the delta even on
- * fake elements.
- * 
- * @param <DATA>
- *            The type of the data points which are handled by this policy
- */
-public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
-		CloneableEvictionPolicy<DATA> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = -7797538922123394967L;
-
-	//Used for serializing the threshold
-	private final static int INITIAL_SERIALIZER_BYTES = 1024;
-
-	protected DeltaFunction<DATA> deltaFuntion;
-	private List<DATA> windowBuffer;
-	protected double threshold;
-	private TypeSerializer<DATA> typeSerializer;
-	protected transient DATA triggerDataPoint;
-
-	/**
-	 * Creates a delta policy which calculates a delta between the data point
-	 * which triggered last and the currently arrived data point. It triggers if
-	 * the delta is higher than a specified threshold. As the data may be sent to
-	 * the cluster a {@link TypeSerializer} is needed for the initial value.
-	 *
-	 * <p>
-	 * In case it gets used for eviction, this policy starts from the first
-	 * element of the buffer and removes all elements from the buffer which have
-	 * a higher delta then the threshold. As soon as there is an element with a
-	 * lower delta, the eviction stops.
-	 * </p>
-	 *
-	 * @param deltaFuntion
-	 * 				The delta function to be used.
-	 * @param init
-	 *				The initial to be used for the calculation of a delta before
-	 *				the first trigger.
-	 * @param threshold
-	 * 				The threshold upon which a triggering should happen.
-	 * @param typeSerializer
-	 * 				TypeSerializer to properly forward the initial value to
-	 * 				the cluster
-	 */
-	@SuppressWarnings("unchecked")
-	public DeltaPolicy(DeltaFunction<DATA> deltaFuntion, DATA init, double threshold, TypeSerializer typeSerializer) {
-		this.deltaFuntion = deltaFuntion;
-		this.triggerDataPoint = init;
-		this.windowBuffer = new LinkedList<DATA>();
-		this.threshold = threshold;
-		this.typeSerializer = typeSerializer;
-	}
-
-	@Override
-	public boolean notifyTrigger(DATA datapoint) {
-		if (deltaFuntion.getDelta(this.triggerDataPoint, datapoint) > this.threshold) {
-			this.triggerDataPoint = datapoint;
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-		windowBuffer = windowBuffer.subList(windowBuffer.size() - bufferSize, bufferSize);
-		int evictCount = 0;
-		for (DATA bufferPoint : windowBuffer) {
-			if (deltaFuntion.getDelta(bufferPoint, datapoint) < this.threshold) {
-				break;
-			}
-			evictCount++;
-		}
-
-		if (evictCount > 0) {
-			windowBuffer = windowBuffer.subList(evictCount, windowBuffer.size());
-		}
-		windowBuffer.add(datapoint);
-		return evictCount;
-	}
-
-	@Override
-	public DeltaPolicy<DATA> clone() {
-		return new DeltaPolicy<DATA>(deltaFuntion, triggerDataPoint, threshold, typeSerializer);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof DeltaPolicy)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				DeltaPolicy<DATA> otherPolicy = (DeltaPolicy<DATA>) other;
-				return threshold == otherPolicy.threshold
-						&& deltaFuntion.getClass() == otherPolicy.deltaFuntion.getClass()
-						&& triggerDataPoint.equals(otherPolicy.triggerDataPoint);
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")";
-	}
-
-	private void writeObject(ObjectOutputStream stream) throws IOException{
-		stream.defaultWriteObject();
-		DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_SERIALIZER_BYTES);
-		typeSerializer.serialize(triggerDataPoint, dataOutputSerializer);
-		stream.write(dataOutputSerializer.getByteArray());
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
-		stream.defaultReadObject();
-		byte[] bytes = new byte[stream.available()];
-		stream.readFully(bytes);
-		triggerDataPoint = typeSerializer.deserialize(new DataInputDeserializer(bytes, 0, bytes.length));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
deleted file mode 100644
index b95053a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/EvictionPolicy.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.io.Serializable;
-
-/**
- * An eviction policy specifies under which condition data points should be
- * deleted from the buffer. Deletions must be done only in the order the
- * elements arrived. Therefore, the policy only returns the number of elements
- * to evict on each element arrival.
- * 
- * @param <DATA>
- *            the type of the data handled by this policy
- */
-public interface EvictionPolicy<DATA> extends Serializable {
-
-	/**
-	 * Proves if and how many elements should be deleted from the element
-	 * buffer. The eviction takes place after the trigger and after the call to
-	 * the UDF but before the adding of the new data point.
-	 *
-	 * @param datapoint
-	 *            data point the data point which arrived
-	 * @param triggered
-	 *            Information whether the UDF was triggered or not
-	 * @param bufferSize
-	 *            The current size of the element buffer at the operator
-	 * @return The number of elements to be deleted from the buffer
-	 */
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
deleted file mode 100644
index 6fad749..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/KeepAllEvictionPolicy.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-public class KeepAllEvictionPolicy<T> implements EvictionPolicy<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public int notifyEviction(T datapoint, boolean triggered, int bufferSize) {
-		return 0;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
deleted file mode 100644
index 79e8119..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiEvictionPolicy.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * This policy provides the ability to use multiple eviction policies at the
- * same time. It allows to use both, active and not active evictions.
- * 
- * @param <DATA>
- *            The type of data-items handled by the policies
- */
-public class MultiEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA> {
-
-	/**
-	 * Default version id.
-	 */
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * This enum provides the different options for the eviction strategy.
-	 * 
-	 * You can choose from the following:
-	 * <ul>
-	 * <li>MIN: The number of elements to evict will be the smallest one which
-	 * is greater than 0 and was returned by any of the given policies. If all
-	 * policies return 0, the result is 0.</li>
-	 * <li>MAX: The number of elements to evict will be the greatest one which
-	 * was returned by any of the given policies.</li>
-	 * <li>SUM: The number of elements to evict will be the sum of all values
-	 * returned by the nested eviction policies.</li>
-	 * <li>PRIORITY: Depending on the order in which the policies have been
-	 * passed to the constructor, the first return value greater than 0 will be
-	 * the the number of elements to evict. If all policies return 0, the result
-	 * is 0.</li>
-	 * </ul>
-	 */
-	public enum EvictionStrategy {
-		MIN, MAX, SUM, PRIORITY
-	}
-
-	private List<EvictionPolicy<DATA>> allEvictionPolicies;
-	private List<ActiveEvictionPolicy<DATA>> activeEvictionPolicies;
-	private EvictionStrategy selectedStrategy;
-
-	/**
-	 * This policy provides the ability to use multiple eviction policies at the
-	 * same time. It allows to use both, active and not active evictions.
-	 * 
-	 * When using this constructor the MAX strategy is used by default. You can
-	 * select other strategies using
-	 * {@link MultiEvictionPolicy#MultiEvictionPolicy(EvictionStrategy, EvictionPolicy...)}
-	 * .
-	 * 
-	 * @param evictionPolicies
-	 *            Any active or not active eviction policies. Both types can be
-	 *            used at the same time.
-	 */
-	public MultiEvictionPolicy(EvictionPolicy<DATA>... evictionPolicies) {
-		this(EvictionStrategy.MAX, evictionPolicies);
-	}
-
-	/**
-	 * This policy provides the ability to use multiple eviction policies at the
-	 * same time. It allows to use both, active and not active evictions.
-	 * 
-	 * @param strategy
-	 *            the strategy to be used. See {@link EvictionStrategy} for a
-	 *            list of possible options.
-	 * @param evictionPolicies
-	 *            Any active or not active eviction policies. Both types can be
-	 *            used at the same time.
-	 */
-	public MultiEvictionPolicy(EvictionStrategy strategy, EvictionPolicy<DATA>... evictionPolicies) {
-		// initialize lists of policies
-		this.allEvictionPolicies = new LinkedList<EvictionPolicy<DATA>>();
-		this.activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<DATA>>();
-
-		// iterate over policies and add them to the lists
-		for (EvictionPolicy<DATA> ep : evictionPolicies) {
-			this.allEvictionPolicies.add(ep);
-			if (ep instanceof ActiveEvictionPolicy) {
-				this.activeEvictionPolicies.add((ActiveEvictionPolicy<DATA>) ep);
-			}
-		}
-
-		// Remember eviction strategy
-		this.selectedStrategy = strategy;
-	}
-
-	@Override
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-		LinkedList<Integer> results = new LinkedList<Integer>();
-		for (EvictionPolicy<DATA> policy : allEvictionPolicies) {
-			results.add(policy.notifyEviction(datapoint, triggered, bufferSize));
-		}
-		return getNumToEvict(results);
-	}
-
-	@Override
-	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
-		LinkedList<Integer> results = new LinkedList<Integer>();
-		for (ActiveEvictionPolicy<DATA> policy : activeEvictionPolicies) {
-			results.add(policy.notifyEvictionWithFakeElement(datapoint, bufferSize));
-		}
-		return getNumToEvict(results);
-	}
-
-	private int getNumToEvict(LinkedList<Integer> items) {
-		int result;
-		switch (selectedStrategy) {
-
-		case MIN:
-			result = Integer.MAX_VALUE;
-			for (Integer item : items) {
-				if (result > item) {
-					result = item;
-				}
-			}
-			return result;
-
-		case MAX:
-			result = 0;
-			for (Integer item : items) {
-				if (result < item) {
-					result = item;
-				}
-			}
-			return result;
-
-		case SUM:
-			result = 0;
-			for (Integer item : items) {
-				result += item;
-			}
-			return result;
-
-		case PRIORITY:
-			for (Integer item : items) {
-				if (item > 0) {
-					return item;
-				}
-			}
-			return 0;
-
-		default:
-			// The following line should never be reached. Just for the
-			// compiler.
-			return 0;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java
deleted file mode 100644
index a3c6a22..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicy.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * This class allows to use multiple trigger policies at the same time. It
- * allows to use both, active and not active triggers.
- * 
- * @param <DATA>
- *            the data type handled by this policy
- */
-public class MultiTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA> {
-
-	/**
-	 * Default version id.
-	 */
-	private static final long serialVersionUID = 1L;
-
-	private List<TriggerPolicy<DATA>> allTriggerPolicies;
-	private List<ActiveTriggerPolicy<DATA>> activeTriggerPolicies;
-
-	/**
-	 * This policy allows to use multiple trigger policies at the same time. It
-	 * allows to use both, active and not active triggers.
-	 * 
-	 * This policy triggers in case at least one of the nested policies
-	 * triggered. If active policies are nested all produces fake elements will
-	 * be returned.
-	 * 
-	 * @param policies
-	 *            Any active or not active trigger policies. Both types can be
-	 *            used at the same time.
-	 */
-	public MultiTriggerPolicy(TriggerPolicy<DATA>... policies) {
-		allTriggerPolicies = new LinkedList<TriggerPolicy<DATA>>();
-		activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<DATA>>();
-
-		for (TriggerPolicy<DATA> policy : policies) {
-			this.allTriggerPolicies.add(policy);
-			if (policy instanceof ActiveTriggerPolicy) {
-				this.activeTriggerPolicies.add((ActiveTriggerPolicy<DATA>) policy);
-			}
-		}
-	}
-
-	@Override
-	public boolean notifyTrigger(DATA datapoint) {
-		boolean trigger = false;
-		for (TriggerPolicy<DATA> policy : allTriggerPolicies) {
-			if (policy.notifyTrigger(datapoint)) {
-				trigger = true;
-				// Do not at a break here. All trigger must see the element!
-			}
-		}
-		return trigger;
-	}
-
-	@Override
-	public Object[] preNotifyTrigger(DATA datapoint) {
-		List<Object> fakeElements = new LinkedList<Object>();
-		for (ActiveTriggerPolicy<DATA> policy : activeTriggerPolicies) {
-			for (Object fakeElement : policy.preNotifyTrigger(datapoint)) {
-				fakeElements.add(fakeElement);
-			}
-		}
-		return fakeElements.toArray();
-	}
-
-	@Override
-	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
-		List<Runnable> runnables = new LinkedList<Runnable>();
-		for (ActiveTriggerPolicy<DATA> policy : activeTriggerPolicies) {
-			Runnable tmp = policy.createActiveTriggerRunnable(callback);
-			if (tmp != null) {
-				runnables.add(tmp);
-			}
-		}
-		if (runnables.size() == 0) {
-			return null;
-		} else {
-			return new MultiActiveTriggerRunnable(runnables);
-		}
-	}
-
-	/**
-	 * This class serves a nest for all active trigger runnables. Once the run
-	 * method gets executed, all the runnables are started in own threads.
-	 */
-	private class MultiActiveTriggerRunnable implements Runnable {
-
-		List<Runnable> runnables;
-
-		MultiActiveTriggerRunnable(List<Runnable> runnables) {
-			this.runnables = runnables;
-		}
-
-		@Override
-		public void run() {
-			for (Runnable runnable : runnables) {
-				new Thread(runnable).start();
-			}
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
deleted file mode 100644
index eaa8063..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This policy can be used to trigger and evict based on a punctuation which is
- * present within the arriving data. Using this policy, one can react on an
- * externally defined arbitrary windowing semantic.
- * 
- * In case this policy is used for eviction, the complete buffer will get
- * deleted in case the punctuation is detected.
- * 
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it react on punctuation even in
- * fake elements.
- * 
- * @param <IN>
- *            The type of the input data handled by this policy. An
- *            {@link Extractor} can be used to extract DATA for IN.
- * @param <DATA>
- *            The type of the punctuation. An {@link Extractor} can be used to
- *            extract DATA for IN.
- */
-public class PunctuationPolicy<IN, DATA> implements CloneableTriggerPolicy<IN>,
-		CloneableEvictionPolicy<IN> {
-
-	/**
-	 * auto generated version id
-	 */
-	private static final long serialVersionUID = -8845130188912602498L;
-	private int counter = 0;
-	private Extractor<IN, DATA> extractor;
-	private DATA punctuation;
-
-	/**
-	 * Creates the punctuation policy without using any extractor. To make this
-	 * work IN and DATA must not be different types.
-	 * 
-	 * @param punctuation
-	 *            the punctuation which leads to trigger/evict.
-	 */
-	public PunctuationPolicy(DATA punctuation) {
-		this(punctuation, null);
-	}
-
-	/**
-	 * Creates the punctuation policy which uses the specified extractor to
-	 * isolate the punctuation from the data.
-	 * 
-	 * @param punctuation
-	 *            the punctuation which leads to trigger/evict.
-	 * @param extractor
-	 *            An {@link Extractor} which converts IN to DATA.
-	 */
-	public PunctuationPolicy(DATA punctuation, Extractor<IN, DATA> extractor) {
-		this.punctuation = punctuation;
-		this.extractor = extractor;
-	}
-
-	@Override
-	public int notifyEviction(IN datapoint, boolean triggered, int bufferSize) {
-		if (notifyTrigger(datapoint)) {
-			int tmp = counter;
-			// As the current will be add after the eviction the counter needs
-			// to be set to one already
-			counter = 1;
-			return tmp;
-		} else {
-			counter++;
-			return 0;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public boolean notifyTrigger(IN datapoint) {
-		DATA tmp;
-
-		// eventually extract data
-		if (extractor == null) {
-			// unchecked convert (cannot check it here)
-			tmp = (DATA) datapoint;
-		} else {
-			tmp = extractor.extract(datapoint);
-		}
-
-		// compare data with punctuation
-		if (punctuation.equals(tmp)) {
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public PunctuationPolicy<IN, DATA> clone() {
-		return new PunctuationPolicy<IN, DATA>(punctuation, extractor);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof PunctuationPolicy)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				PunctuationPolicy<IN, DATA> otherPolicy = (PunctuationPolicy<IN, DATA>) other;
-				if (extractor != null) {
-					return extractor.getClass() == otherPolicy.extractor.getClass()
-							&& punctuation.equals(otherPolicy.punctuation);
-				} else {
-					return punctuation.equals(otherPolicy.punctuation)
-							&& otherPolicy.extractor == null;
-				}
-
-			} catch (Exception e) {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "PunctuationPolicy(" + punctuation
-				+ (extractor != null
-					? ", " + extractor.getClass().getSimpleName()
-					: "")
-				+ ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
deleted file mode 100644
index ae17e29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * This eviction policy evicts all elements which are older then a specified
- * time. The time is measured using a given {@link Timestamp} implementation. A
- * point in time is always represented as long. Therefore, the granularity can
- * be set as long value as well.
- * 
- * @param <DATA>
- *            The type of the incoming data points which are processed by this
- *            policy.
- */
-public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
-		CloneableEvictionPolicy<DATA> {
-
-	/**
-	 * auto generated version id
-	 */
-	private static final long serialVersionUID = -1457476766124518220L;
-
-	private long granularity;
-	private TimestampWrapper<DATA> timestampWrapper;
-	private LinkedList<Long> buffer = new LinkedList<Long>();
-
-	/**
-	 * This eviction policy evicts all elements which are older than a specified
-	 * time. The time is measured using a given {@link Timestamp}
-	 * implementation. A point in time is always represented as long. Therefore,
-	 * the granularity can be set as long value as well. If this value is set to
-	 * 2 the policy will evict all elements which are older as 2.
-	 * 
-	 * <code>
-	 *   while (time(firstInBuffer)<current time-granularity){
-	 *   	evict firstInBuffer;
-	 *   }
-	 * </code>
-	 * 
-	 * @param granularity
-	 *            The granularity of the eviction. If this value is set to 2 the
-	 *            policy will evict all elements which are older as 2(if
-	 *            (time(X)<current time-granularity) evict X).
-	 * @param timestampWrapper
-	 *            The {@link TimestampWrapper} to measure the time with. This
-	 *            can be either user defined of provided by the API.
-	 */
-	public TimeEvictionPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
-		this.timestampWrapper = timestampWrapper;
-		this.granularity = granularity;
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public int notifyEvictionWithFakeElement(Object datapoint, int bufferSize) {
-		checkForDeleted(bufferSize);
-
-		long threshold;
-		try {
-			threshold = (Long) datapoint - granularity;
-		} catch (ClassCastException e) {
-			threshold = timestampWrapper.getTimestamp((DATA) datapoint) - granularity;
-		}
-
-		// return result
-		return deleteAndCountExpired(threshold);
-
-	}
-
-	@Override
-	public int notifyEviction(DATA datapoint, boolean triggered, int bufferSize) {
-
-		checkForDeleted(bufferSize);
-
-		// remember timestamp
-		long time = timestampWrapper.getTimestamp(datapoint);
-
-		// delete and count expired tuples
-		long threshold = time - granularity;
-		int counter = deleteAndCountExpired(threshold);
-
-		// Add current element to buffer
-		buffer.add(time);
-
-		// return result
-		return counter;
-
-	}
-
-	private void checkForDeleted(int bufferSize) {
-		// check for deleted tuples (deletes by other policies)
-		while (bufferSize < this.buffer.size()) {
-			this.buffer.removeFirst();
-		}
-	}
-
-	private int deleteAndCountExpired(long threshold) {
-		int counter = 0;
-		while (!buffer.isEmpty()) {
-
-			if (buffer.getFirst() <= threshold) {
-				buffer.removeFirst();
-				counter++;
-			} else {
-				break;
-			}
-		}
-		return counter;
-
-	}
-
-	@Override
-	public TimeEvictionPolicy<DATA> clone() {
-		return new TimeEvictionPolicy<DATA>(granularity, timestampWrapper);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof TimeEvictionPolicy)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				TimeEvictionPolicy<DATA> otherPolicy = (TimeEvictionPolicy<DATA>) other;
-				return granularity == otherPolicy.granularity
-						&& timestampWrapper.equals(otherPolicy.timestampWrapper);
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	public long getWindowSize() {
-		return granularity;
-	}
-
-	@Override
-	public String toString() {
-		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
-				+ ")";
-	}
-
-	public TimestampWrapper<DATA> getTimeStampWrapper() {
-		return timestampWrapper;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
deleted file mode 100644
index 03984a9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.util.LinkedList;
-
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * This trigger policy triggers with regard to the time. The is measured using a
- * given {@link Timestamp} implementation. A point in time is always represented
- * as long. Therefore, parameters such as granularity and delay can be set as
- * long value as well.
- * 
- * @param <DATA>
- *            The type of the incoming data points which are processed by this
- *            policy.
- */
-public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
-		CloneableTriggerPolicy<DATA>, CentralActiveTrigger<DATA> {
-
-	/**
-	 * auto generated version id
-	 */
-	private static final long serialVersionUID = -5122753802440196719L;
-
-	protected long startTime;
-	public long granularity;
-	public TimestampWrapper<DATA> timestampWrapper;
-
-	/**
-	 * This is mostly the same as
-	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimestampWrapper)}. In addition
-	 * to granularity and timestamp a delay can be specified for the first
-	 * trigger. If the start time given by the timestamp is x, the delay is y,
-	 * and the granularity is z, the first trigger will happen at x+y+z.
-	 * 
-	 * @param granularity
-	 *            The granularity of the trigger. If this value is set to 2 the
-	 *            policy will trigger at every second time point
-	 * @param timestampWrapper
-	 *            The {@link TimestampWrapper} to measure the time with. This
-	 *            can be either user defined of provided by the API.
-	 */
-	public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
-		this.startTime = timestampWrapper.getStartTime();
-		this.timestampWrapper = timestampWrapper;
-		this.granularity = granularity;
-	}
-
-	/**
-	 * This method checks if we missed a window end. If this is the case we
-	 * trigger the missed windows using fake elements.
-	 */
-	@Override
-	public synchronized Object[] preNotifyTrigger(DATA datapoint) {
-		LinkedList<Object> fakeElements = new LinkedList<Object>();
-		// check if there is more then one window border missed
-		// use > here. In case >= would fit, the regular call will do the job.
-		while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) {
-			startTime += granularity;
-			fakeElements.add(startTime - 1);
-		}
-		return (Object[]) fakeElements.toArray();
-	}
-
-	/**
-	 * In case {@link SystemTimestamp} is used, a runnable is returned which
-	 * triggers based on the current system time. If any other time measure is
-	 * used the method returns null.
-	 * 
-	 * @param callback
-	 *            The object which is takes the callbacks for adding fake
-	 *            elements out of the runnable.
-	 * @return A runnable is returned which triggers based on the current system
-	 *         time. If any other time measure is used the method return null.
-	 */
-	@Override
-	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
-		if (this.timestampWrapper.isDefaultTimestamp()) {
-			return new TimeCheck(callback);
-		} else {
-			return null;
-		}
-	}
-
-	/**
-	 * This method is only called in case the runnable triggers a window end
-	 * according to the {@link SystemTimestamp}.
-	 * 
-	 * @param callback
-	 *            The callback object.
-	 */
-	public synchronized Object activeFakeElementEmission(ActiveTriggerCallback callback) {
-
-		// start time is excluded, but end time is included: >=
-		if (System.currentTimeMillis() >= startTime + granularity) {
-			startTime += granularity;
-			if (callback != null) {
-				callback.sendFakeElement(startTime - 1);
-			}
-			return startTime - 1;
-		}
-		return null;
-
-	}
-
-	private class TimeCheck implements Runnable {
-		ActiveTriggerCallback callback;
-
-		public TimeCheck(ActiveTriggerCallback callback) {
-			this.callback = callback;
-		}
-
-		@Override
-		public void run() {
-			while (true) {
-				// wait for the specified granularity
-				try {
-					Thread.sleep(granularity);
-				} catch (InterruptedException e) {
-					// ignore it...
-				}
-				// Trigger using the respective methods. Methods are
-				// synchronized to prevent race conditions between real and fake
-				// elements at the policy.
-				activeFakeElementEmission(callback);
-			}
-		}
-	}
-
-	@Override
-	public synchronized boolean notifyTrigger(DATA datapoint) {
-		long recordTime = timestampWrapper.getTimestamp(datapoint);
-		if (recordTime >= startTime + granularity) {
-			if (granularity != 0) {
-				startTime = recordTime - ((recordTime - startTime) % granularity);
-			}
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public TimeTriggerPolicy<DATA> clone() {
-		return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof TimeTriggerPolicy)) {
-			return false;
-		} else {
-			try {
-				@SuppressWarnings("unchecked")
-				TimeTriggerPolicy<DATA> otherPolicy = (TimeTriggerPolicy<DATA>) other;
-				return startTime == otherPolicy.startTime && granularity == otherPolicy.granularity
-						&& timestampWrapper.equals(otherPolicy.timestampWrapper);
-			} catch (ClassCastException e) {
-				return false;
-			}
-		}
-	}
-
-	public long getSlideSize() {
-		return granularity;
-	}
-
-	@Override
-	public String toString() {
-		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
-				+ ")";
-	}
-
-	public TimestampWrapper<DATA> getTimeStampWrapper() {
-		return timestampWrapper;
-	}
-
-	@Override
-	public Object[] notifyOnLastGlobalElement(DATA datapoint) {
-		LinkedList<Object> fakeElements = new LinkedList<Object>();
-		// check if there is more then one window border missed
-		// use > here. In case >= would fit, the regular call will do the job.
-		while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) {
-			startTime += granularity;
-			fakeElements.add(startTime - 1);
-		}
-		return (Object[]) fakeElements.toArray();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
deleted file mode 100644
index c212df6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TriggerPolicy.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-import java.io.Serializable;
-
-/**
- * Proves and returns if a new window should be started. In case the trigger
- * occurs (return value true) the UDF will be executed on the current element
- * buffer without the last added element which is provided as parameter. This
- * element will be added to the buffer after the execution of the UDF.
- * 
- * @param <DATA>
- *            The data type which can be handled by this policy
- */
-public interface TriggerPolicy<DATA> extends Serializable {
-
-	/**
-	 * Proves and returns if a new window should be started. In case the trigger
-	 * occurs (return value true) the UDF will be executed on the current
-	 * element buffer without the last added element which is provided as
-	 * parameter. This element will be added to the buffer after the execution
-	 * of the UDF.
-	 * 
-	 * There are possibly different strategies for eviction and triggering: 1)
-	 * including last data point: Better/faster for count eviction 2) excluding
-	 * last data point: Essentially required for time based eviction and delta
-	 * rules As 2) is required for some policies and the benefit of using 1) is
-	 * small for the others, policies are implemented according to 2).
-	 *
-	 * @param datapoint
-	 *            the data point which arrived
-	 * @return true if the current windows should be closed, otherwise false. In
-	 *         true case the given data point will be part of the next window
-	 *         and will not be included in the current one.
-	 */
-	public boolean notifyTrigger(DATA datapoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
deleted file mode 100644
index 08c49e9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.policy;
-
-/**
- * This eviction policy deletes all elements from the buffer in case a trigger
- * occurred. Therefore, it is the default eviction policy to be used for any
- * tumbling window.
- * 
- * By default this policy does not react on fake elements. Wrap it in an
- * {@link ActiveEvictionPolicyWrapper} to make it clearing the buffer even on
- * fake elements.
- * 
- * @param <DATA>
- *            The type of the data points which is handled by this policy
- */
-public class TumblingEvictionPolicy<DATA> implements CloneableEvictionPolicy<DATA> {
-
-	/**
-	 * Auto generated version ID
-	 */
-	private static final long serialVersionUID = -4018019069267281155L;
-
-	/**
-	 * Counter for the current number of elements in the buffer
-	 */
-	private int counter = 0;
-
-	/**
-	 * This is the default constructor providing no special functionality. This
-	 * eviction policy deletes all elements from the buffer in case a trigger
-	 * occurred. Therefore, it is the default eviction policy to be used for any
-	 * tumbling window.
-	 */
-	public TumblingEvictionPolicy() {
-		// default constructor, no further logic needed
-	}
-
-	/**
-	 * This constructor allows to set a custom start value for the element
-	 * counter.
-	 * 
-	 * This eviction policy deletes all elements from the buffer in case a
-	 * trigger occurred. Therefore, it is the default eviction policy to be used
-	 * for any tumbling window.
-	 * 
-	 * @param startValue
-	 *            A start value for the element counter
-	 */
-	public TumblingEvictionPolicy(int startValue) {
-		this.counter = startValue;
-	}
-
-	/**
-	 * Deletes all elements from the buffer in case the trigger occurred.
-	 */
-	@Override
-	public int notifyEviction(Object datapoint, boolean triggered, int bufferSize) {
-		if (triggered) {
-			// The current data point will be part of the next window!
-			// Therefore the counter needs to be set to one already.
-			int tmpCounter = counter;
-			counter = 1;
-			return tmpCounter;
-		} else {
-			counter++;
-			return 0;
-		}
-	}
-
-	@Override
-	public TumblingEvictionPolicy<DATA> clone() {
-		return new TumblingEvictionPolicy<DATA>(counter);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof TumblingEvictionPolicy)) {
-			return false;
-		} else {
-			return true;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "TumblingPolicy";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
deleted file mode 100644
index 33fb29d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import java.util.LinkedList;
-import java.util.NoSuchElementException;
-
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Basic window buffer that stores the elements in a simple list without any
- * pre-aggregation.
- */
-public class BasicWindowBuffer<T> extends WindowBuffer<T> {
-
-	private static final long serialVersionUID = 1L;
-	protected LinkedList<T> buffer;
-
-	public BasicWindowBuffer() {
-		this.buffer = new LinkedList<T>();
-	}
-
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-		if (emitEmpty || !buffer.isEmpty()) {
-			StreamWindow<T> currentWindow = createEmptyWindow();
-			currentWindow.addAll(buffer);
-			collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
-		} 
-	}
-
-	public void store(T element) throws Exception {
-		buffer.add(element);
-	}
-
-	public void evict(int n) {
-		for (int i = 0; i < n; i++) {
-			try {
-				buffer.removeFirst();
-			} catch (NoSuchElementException e) {
-				// In case no more elements are in the buffer:
-				// Prevent failure and stop deleting.
-				break;
-			}
-		}
-	}
-
-	@Override
-	public BasicWindowBuffer<T> clone() {
-		return new BasicWindowBuffer<T>();
-	}
-
-	@Override
-	public String toString() {
-		return buffer.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
deleted file mode 100644
index 195a966..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class JumpingCountGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final long countToSkip; // How many elements should be jumped over
-	private long skipped = 0; // How many elements have we skipped since the last emitWindow
-
-	public JumpingCountGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
-										TypeSerializer<T> serializer, long countToSkip) {
-		super(reducer, keySelector, serializer);
-		this.countToSkip = countToSkip;
-	}
-
-	@Override
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-		super.emitWindow(collector);
-		skipped = 0;
-	}
-
-	@Override
-	public void store(T element) throws Exception {
-		if(skipped == countToSkip){
-			super.store(element);
-		} else {
-			skipped++;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
deleted file mode 100644
index 17fe408..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for jumping time eviction policy
- * (the policies are based on count, and the slide size is larger than the window size).
- */
-public class JumpingCountPreReducer<T> extends TumblingPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final long countToSkip; // How many elements should be jumped over
-	private long skipped = 0; // How many elements have we skipped since the last emitWindow
-
-	public JumpingCountPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer, long countToSkip){
-		super(reducer, serializer);
-		this.countToSkip = countToSkip;
-	}
-
-	@Override
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-		super.emitWindow(collector);
-		skipped = 0;
-	}
-
-	@Override
-	public void store(T element) throws Exception {
-		if(skipped == countToSkip){
-			super.store(element);
-		} else {
-			skipped++;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
deleted file mode 100644
index a92fc98..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class JumpingTimeGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private TimestampWrapper<T> timestampWrapper;
-	protected long windowStartTime;
-	private long slideSize;
-
-	public JumpingTimeGroupedPreReducer(ReduceFunction<T> reducer, KeySelector<T, ?> keySelector,
-										TypeSerializer<T> serializer,
-										long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
-		super(reducer, keySelector, serializer);
-		this.timestampWrapper = timestampWrapper;
-		this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
-		this.slideSize = slideSize;
-	}
-
-	@Override
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-		super.emitWindow(collector);
-		windowStartTime += slideSize;
-	}
-
-	public void store(T element) throws Exception {
-		if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
-			super.store(element);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
deleted file mode 100644
index 1a47bc8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-/**
- * Non-grouped pre-reducer for jumping time eviction policy
- * (the policies are based on time, and the slide size is larger than the window size).
- */
-public class JumpingTimePreReducer<T> extends TumblingPreReducer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private TimestampWrapper<T> timestampWrapper;
-	protected long windowStartTime;
-	private long slideSize;
-
-	public JumpingTimePreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
-								long slideSize, long windowSize, TimestampWrapper<T> timestampWrapper){
-		super(reducer, serializer);
-		this.timestampWrapper = timestampWrapper;
-		this.windowStartTime = timestampWrapper.getStartTime() + slideSize - windowSize;
-		this.slideSize = slideSize;
-	}
-
-	@Override
-	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
-		super.emitWindow(collector);
-		windowStartTime += slideSize;
-	}
-
-	public void store(T element) throws Exception {
-		if(timestampWrapper.getTimestamp(element) >= windowStartTime) {
-			super.store(element);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
deleted file mode 100644
index 1b95248..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/PreAggregator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-/**
- * Interface for marking window pre-aggregators that fully process the window so
- * that no further reduce step is necessary afterwards.
- */
-public interface PreAggregator {
-
-}