You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:50 UTC

[34/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
deleted file mode 100644
index 3e1ff57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/transformations/UnionTransformation.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.transformations;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This transformation represents a union of several input
- * {@link StreamTransformation StreamTransformations}.
- *
- * <p>
- * This does not create a physical operation, it only affects how upstream operations are
- * connected to downstream operations.
- *
- * @param <T> The type of the elements that result from this {@code UnionTransformation}
- */
-public class UnionTransformation<T> extends StreamTransformation<T> {
-	private final List<StreamTransformation<T>> inputs;
-
-	/**
-	 * Creates a new {@code UnionTransformation} from the given input {@code StreamTransformations}.
-	 *
-	 * <p>
-	 * The input {@code StreamTransformations} must all have the same type.
-	 *
-	 * @param inputs The list of input {@code StreamTransformations}
-	 */
-	public UnionTransformation(List<StreamTransformation<T>> inputs) {
-		super("Union", inputs.get(0).getOutputType(), inputs.get(0).getParallelism());
-
-		for (StreamTransformation<T> input: inputs) {
-			if (!input.getOutputType().equals(getOutputType())) {
-				throw new UnsupportedOperationException("Type mismatch in input " + input);
-			}
-		}
-
-		this.inputs = Lists.newArrayList(inputs);
-	}
-
-	/**
-	 * Returns the list of input {@code StreamTransformations}.
-	 */
-	public List<StreamTransformation<T>> getInputs() {
-		return inputs;
-	}
-
-	@Override
-	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
-		List<StreamTransformation<?>> result = Lists.newArrayList();
-		result.add(this);
-		for (StreamTransformation<T> input: inputs) {
-			result.addAll(input.getTransitivePredecessors());
-		}
-		return result;
-	}
-
-	@Override
-	public final void setChainingStrategy(ChainingStrategy strategy) {
-		throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
deleted file mode 100644
index 838c24a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.watermark;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-
-/**
- * A Watermark tells operators that receive it that no elements with a timestamp older or equal
- * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
- * sources and propagate through the operators of the topology. Operators must themselves emit
- * watermarks to downstream operators using
- * {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that
- * do not internally buffer elements can always forward the watermark that they receive. Operators
- * that buffer elements, such as window operators, must forward a watermark after emission of
- * elements that is triggered by the arriving watermark.
- *
- * <p>
- * In some cases a watermark is only a heuristic and operators should be able to deal with
- * late elements. They can either discard those or update the result and emit updates/retractions
- * to downstream operations.
- *
- * <p>
- * When a source closes it will emit a final watermark with timestamp {@code Long.MAX_VALUE}. When
- * an operator receives this it will know that no more input will be arriving in the future.
- *
- */
-public class Watermark extends StreamElement {
-
-	/** The timestamp of the watermark */
-	private final long timestamp;
-
-	/**
-	 * Creates a new watermark with the given timestamp.
-	 */
-	public Watermark(long timestamp) {
-		this.timestamp = timestamp;
-	}
-
-	/**
-	 * Returns the timestamp associated with this {@link Watermark} in milliseconds.
-	 */
-	public long getTimestamp() {
-		return timestamp;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		return this == o ||
-				o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;
-	}
-
-	@Override
-	public int hashCode() {
-		return (int) (timestamp ^ (timestamp >>> 32));
-	}
-
-	@Override
-	public String toString() {
-		return "Watermark @ " + timestamp;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
deleted file mode 100644
index 4d5b9d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link WindowAssigner} that assigns all elements to the same global window.
- *
- * <p>
- * Use this if you want to use a {@link Trigger} and
- * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based
- * windows.
- */
-public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private GlobalWindows() {}
-
-	@Override
-	public Collection<GlobalWindow> assignWindows(Object element, long timestamp) {
-		return Collections.singletonList(GlobalWindow.get());
-	}
-
-	@Override
-	public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
-		return new NeverTrigger();
-	}
-
-	@Override
-	public String toString() {
-		return "GlobalWindows()";
-	}
-
-	/**
-	 * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
-	 * all elements to the same {@link GlobalWindow}.
-	 *
-	 * @return The global window policy.
-	 */
-	public static GlobalWindows create() {
-		return new GlobalWindows();
-	}
-
-	/**
-	 * A trigger that never fires, as default Trigger for GlobalWindows.
-	 */
-	private static class NeverTrigger implements Trigger<Object, GlobalWindow> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public TriggerResult onElement(Object element,
-				long timestamp,
-				GlobalWindow window,
-				TriggerContext ctx) {
-				return TriggerResult.CONTINUE;
-		}
-
-		@Override
-		public TriggerResult onEventTime(long time, TriggerContext ctx) {
-			return TriggerResult.CONTINUE;
-		}
-
-		@Override
-		public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
-			return TriggerResult.CONTINUE;
-		}
-	}
-
-	@Override
-	public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
-		return new GlobalWindow.Serializer();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
deleted file mode 100644
index 5f7ab45..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
- * elements. Windows can possibly overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private final long size;
-
-	private final long slide;
-
-	private SlidingTimeWindows(long size, long slide) {
-		this.size = size;
-		this.slide = slide;
-	}
-
-	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
-		long lastStart = timestamp - timestamp % slide;
-		for (long start = lastStart;
-			start > timestamp - size;
-			start -= slide) {
-			windows.add(new TimeWindow(start, start + size));
-		}
-		return windows;
-	}
-
-	public long getSize() {
-		return size;
-	}
-
-	public long getSlide() {
-		return slide;
-	}
-
-	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
-		if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
-			return ProcessingTimeTrigger.create();
-		} else {
-			return EventTimeTrigger.create();
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "SlidingTimeWindows(" + size + ", " + slide + ")";
-	}
-
-	/**
-	 * Creates a new {@code SlidingTimeWindows} {@link WindowAssigner} that assigns
-	 * elements to sliding time windows based on the element timestamp.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param slide The slide interval of the generated windows.
-	 * @return The time policy.
-	 */
-	public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
-		return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
-	}
-
-	@Override
-	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
-		return new TimeWindow.Serializer();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
deleted file mode 100644
index 463b2c4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
- * elements. Windows cannot overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private long size;
-
-	private TumblingTimeWindows(long size) {
-		this.size = size;
-	}
-
-	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		long start = timestamp - (timestamp % size);
-		return Collections.singletonList(new TimeWindow(start, start + size));
-	}
-
-	public long getSize() {
-		return size;
-	}
-
-	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
-		if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
-			return ProcessingTimeTrigger.create();
-		} else {
-			return EventTimeTrigger.create();
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "TumblingTimeWindows(" + size + ")";
-	}
-
-	/**
-	 * Creates a new {@code TumblingTimeWindows} {@link WindowAssigner} that assigns
-	 * elements to time windows based on the element timestamp.
-	 *
-	 * @param size The size of the generated windows.
-	 * @return The time policy.
-	 */
-	public static TumblingTimeWindows of(AbstractTime size) {
-		return new TumblingTimeWindows(size.toMilliseconds());
-	}
-
-	@Override
-	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
-		return new TimeWindow.Serializer();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
deleted file mode 100644
index 4b4b1ea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import java.io.Serializable;
-
-import java.util.Collection;
-
-/**
- * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
- *
- * <p>
- * In a window operation, elements are grouped by their key (if available) and by the windows to
- * which it was assigned. The set of elements with the same key and window is called a pane.
- * When a {@link Trigger} decides that a certain pane should fire the
- * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
- * to produce output elements for that pane.
- *
- * @param <T> The type of elements that this WindowAssigner can assign windows to.
- * @param <W> The type of {@code Window} that this assigner assigns.
- */
-public abstract class WindowAssigner<T, W extends Window> implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Returns a {@code Collection} of windows that should be assigned to the element.
-	 *
-	 * @param element The element to which windows should be assigned.
-	 * @param timestamp The timestamp of the element.
-	 */
-	public abstract Collection<W> assignWindows(T element, long timestamp);
-
-	/**
-	 * Returns the default trigger associated with this {@code WindowAssigner}.
-	 */
-	public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
-
-	/**
-	 * Returns a {@link TypeSerializer} for serializing windows that are assigned by
-	 * this {@code WindowAssigner}.
-	 */
-	public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
deleted file mode 100644
index 0a078e2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * An {@link Evictor} that keeps only a certain amount of elements.
- *
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public class CountEvictor<W extends Window> implements Evictor<Object, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final long maxCount;
-
-	private CountEvictor(long count) {
-		this.maxCount = count;
-	}
-
-	@Override
-	public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
-		if (size > maxCount) {
-			return (int) (size - maxCount);
-		} else {
-			return 0;
-		}
-	}
-
-	/**
-	 * Creates a {@code CountEvictor} that keeps the given number of elements.
-	 *
-	 * @param maxCount The number of elements to keep in the pane.
-	 */
-	public static <W extends Window> CountEvictor<W> of(long maxCount) {
-		return new CountEvictor<>(maxCount);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
deleted file mode 100644
index 0083a04..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import com.google.common.collect.Iterables;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold.
- *
- * <p>
- * Eviction starts from the first element of the buffer and removes all elements from the buffer
- * which have a higher delta then the threshold. As soon as there is an element with a lower delta,
- * the eviction stops.
- *
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
-	private static final long serialVersionUID = 1L;
-
-	DeltaFunction<T> deltaFunction;
-	private double threshold;
-
-	private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
-		this.deltaFunction = deltaFunction;
-		this.threshold = threshold;
-	}
-
-	@Override
-	public int evict(Iterable<StreamRecord<T>> elements, int size, W window) {
-		StreamRecord<T> lastElement = Iterables.getLast(elements);
-		int toEvict = 0;
-		for (StreamRecord<T> element : elements) {
-			if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) < this.threshold) {
-				break;
-			}
-			toEvict++;
-		}
-
-		return toEvict;
-	}
-
-	@Override
-	public String toString() {
-		return "DeltaEvictor(" +  deltaFunction + ", " + threshold + ")";
-	}
-
-	/**
-	 * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
-	 *
-	 * @param threshold The threshold
-	 * @param deltaFunction The {@code DeltaFunction}
-	 */
-	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
-		return new DeltaEvictor<>(threshold, deltaFunction);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
deleted file mode 100644
index 5ceaf2f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import java.io.Serializable;
-
-/**
- * An {@code Evictor} can remove elements from a pane before it is being processed and after
- * window evaluation was triggered by a
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Evictor}.
- *
- * @param <T> The type of elements that this {@code Evictor} can evict.
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public interface Evictor<T, W extends Window> extends Serializable {
-
-	/**
-	 * Computes how many elements should be removed from the pane. The result specifies how
-	 * many elements should be removed from the beginning.
-	 *
-	 * @param elements The elements currently in the pane.
-	 * @param size The current number of elements in the pane.
-	 * @param window The {@link Window}
-	 */
-	int evict(Iterable<StreamRecord<T>> elements, int size, W window);
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
deleted file mode 100644
index 5776d8d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.evictors;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * An {@link Evictor} that keeps elements for a certain amount of time. Elements older
- * than {@code current_time - keep_time} are evicted.
- *
- * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
- */
-public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final long windowSize;
-
-	public TimeEvictor(long windowSize) {
-		this.windowSize = windowSize;
-	}
-
-	@Override
-	public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
-		int toEvict = 0;
-		long currentTime = Iterables.getLast(elements).getTimestamp();
-		long evictCutoff = currentTime - windowSize;
-		for (StreamRecord<Object> record: elements) {
-			if (record.getTimestamp() > evictCutoff) {
-				break;
-			}
-			toEvict++;
-		}
-		return toEvict;
-	}
-
-	@Override
-	public String toString() {
-		return "TimeEvictor(" + windowSize + ")";
-	}
-
-	@VisibleForTesting
-	public long getWindowSize() {
-		return windowSize;
-	}
-
-	/**
-	 * Creates a {@code TimeEvictor} that keeps the given number of elements.
-	 *
-	 * @param windowSize The amount of time for which to keep elements.
-	 */
-	public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
-		return new TimeEvictor<>(windowSize.toMilliseconds());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
deleted file mode 100644
index 3f8fb60..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Base class for {@link Time} implementations.
- */
-public abstract class AbstractTime {
-
-	/** The time unit for this policy's time interval */
-	private final TimeUnit unit;
-	
-	/** The size of the windows generated by this policy */
-	private final long size;
-
-
-	protected AbstractTime(long size, TimeUnit unit) {
-		this.unit = checkNotNull(unit, "time unit may not be null");
-		this.size = size;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the time unit for this policy's time interval.
-	 * @return The time unit for this policy's time interval.
-	 */
-	public TimeUnit getUnit() {
-		return unit;
-	}
-
-	/**
-	 * Gets the length of this policy's time interval.
-	 * @return The length of this policy's time interval.
-	 */
-	public long getSize() {
-		return size;
-	}
-
-	/**
-	 * Converts the time interval to milliseconds.
-	 * @return The time interval in milliseconds.
-	 */
-	public long toMilliseconds() {
-		return unit.toMillis(size);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	public abstract AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic);
-
-	@Override
-	public int hashCode() {
-		return 31 * (int) (size ^ (size >>> 32)) + unit.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj.getClass() == getClass()) {
-			AbstractTime that = (AbstractTime) obj;
-			return this.size == that.size && this.unit.equals(that.unit);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return getClass().getSimpleName() + " (" + size + ' ' + unit.name() + ')';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
deleted file mode 100644
index 6a4349c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/EventTime.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of an event time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#EventTime} for a definition
- * of event time.
- */
-public final class EventTime extends AbstractTime {
-
-	/** Instantiation only via factory method */
-	private EventTime(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public EventTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
-		if (characteristic == TimeCharacteristic.EventTime || characteristic == TimeCharacteristic.IngestionTime) {
-			return this;
-		}
-		else {
-			throw new InvalidProgramException(
-					"Cannot use EventTime policy in a dataflow that runs on " + characteristic);
-		}
-	}
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates an event time policy describing an event time interval.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The event time policy.
-	 */
-	public static EventTime of(long size, TimeUnit unit) {
-		return new EventTime(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
deleted file mode 100644
index 4be6ed0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/ProcessingTime.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a processing time interval for windowing. See
- * {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} for a definition
- * of processing time.
- */
-public final class ProcessingTime extends AbstractTime {
-
-	/** Instantiation only via factory method */
-	private ProcessingTime(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public ProcessingTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic characteristic) {
-		if (characteristic == TimeCharacteristic.ProcessingTime) {
-			return this;
-		}
-		else {
-			throw new InvalidProgramException(
-					"Cannot use ProcessingTime policy in a dataflow that runs on " + characteristic);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a processing time policy describing a processing time interval.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The processing time policy.
-	 */
-	public static ProcessingTime of(long size, TimeUnit unit) {
-		return new ProcessingTime(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
deleted file mode 100644
index d1b3fe3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.time;
-
-import org.apache.flink.streaming.api.TimeCharacteristic;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * The definition of a time interval for windowing. The time characteristic referred
- * to is the default time characteristic set on the execution environment.
- */
-public final class Time extends AbstractTime {
-
-	/** Instantiation only via factory method */
-	private Time(long size, TimeUnit unit) {
-		super(size, unit);
-	}
-
-	@Override
-	public AbstractTime makeSpecificBasedOnTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
-		switch (timeCharacteristic) {
-			case ProcessingTime:
-				return ProcessingTime.of(getSize(), getUnit());
-			case IngestionTime:
-			case EventTime:
-				return EventTime.of(getSize(), getUnit());
-			default:
-				throw new IllegalArgumentException("Unknown time characteristic");
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Factory
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a time policy describing a processing time interval. The policy refers to the
-	 * time characteristic that is set on the dataflow via
-	 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#
-	 * setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param unit The init (seconds, milliseconds) of the time interval.
-	 * @return The time policy.
-	 */
-	public static Time of(long size, TimeUnit unit) {
-		return new Time(size, unit);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
deleted file mode 100644
index ea26309..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A {@link Trigger} that continuously fires based on a given time interval. This fires based
- * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Object, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final long interval;
-
-	private ContinuousEventTimeTrigger(long interval) {
-		this.interval = interval;
-	}
-
-	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
-
-		OperatorState<Boolean> first = ctx.getKeyValueState("first", true);
-
-		if (first.value()) {
-			long start = timestamp - (timestamp % interval);
-			long nextFireTimestamp = start + interval;
-
-			ctx.registerEventTimeTimer(nextFireTimestamp);
-
-			first.update(false);
-			return TriggerResult.CONTINUE;
-		}
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
-		ctx.registerEventTimeTimer(time + interval);
-		return TriggerResult.FIRE;
-	}
-
-	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public String toString() {
-		return "ContinuousProcessingTimeTrigger(" + interval + ")";
-	}
-
-	@VisibleForTesting
-	public long getInterval() {
-		return interval;
-	}
-
-	/**
-	 * Creates a trigger that continuously fires based on the given interval.
-	 *
-	 * @param interval The time interval at which to fire.
-	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
-	 */
-	public static <W extends Window> ContinuousEventTimeTrigger<W> of(AbstractTime interval) {
-		return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
deleted file mode 100644
index be56738..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A {@link Trigger} that continuously fires based on a given time interval. The time is the current
- * system time.
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final long interval;
-
-	private ContinuousProcessingTimeTrigger(long interval) {
-		this.interval = interval;
-	}
-
-	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
-		long currentTime = System.currentTimeMillis();
-
-		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
-		long nextFireTimestamp = fireState.value();
-
-		if (nextFireTimestamp == 0) {
-			long start = currentTime - (currentTime % interval);
-			fireState.update(start + interval);
-
-			ctx.registerProcessingTimeTimer(nextFireTimestamp);
-			return TriggerResult.CONTINUE;
-		}
-		if (currentTime > nextFireTimestamp) {
-			long start = currentTime - (currentTime % interval);
-			fireState.update(start + interval);
-
-			ctx.registerProcessingTimeTimer(nextFireTimestamp);
-
-			return TriggerResult.FIRE;
-		}
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onEventTime(long time,
-			TriggerContext ctx) throws Exception {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
-
-		OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L);
-		long nextFireTimestamp = fireState.value();
-
-		// only fire if an element didn't already fire
-		long currentTime = System.currentTimeMillis();
-		if (currentTime > nextFireTimestamp) {
-			long start = currentTime - (currentTime % interval);
-			fireState.update(start + interval);
-			return TriggerResult.FIRE;
-		}
-		return TriggerResult.CONTINUE;
-	}
-
-	@VisibleForTesting
-	public long getInterval() {
-		return interval;
-	}
-
-	@Override
-	public String toString() {
-		return "ContinuousProcessingTimeTrigger(" + interval + ")";
-	}
-
-	/**
-	 * Creates a trigger that continuously fires based on the given interval.
-	 *
-	 * @param interval The time interval at which to fire.
-	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
-	 */
-	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
-		return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
deleted file mode 100644
index 8512989..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.IOException;
-
-/**
- * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class CountTrigger<W extends Window> implements Trigger<Object, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final long maxCount;
-
-	private CountTrigger(long maxCount) {
-		this.maxCount = maxCount;
-	}
-
-	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
-		OperatorState<Long> count = ctx.getKeyValueState("count", 0L);
-		long currentCount = count.value() + 1;
-		count.update(currentCount);
-		if (currentCount >= maxCount) {
-			count.update(0L);
-			return TriggerResult.FIRE;
-		}
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public String toString() {
-		return "CountTrigger(" +  maxCount + ")";
-	}
-
-	/**
-	 * Creates a trigger that fires once the number of elements in a pane reaches the given count.
-	 *
-	 * @param maxCount The count of elements at which to fire.
-	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
-	 */
-	public static <W extends Window> CountTrigger<W> of(long maxCount) {
-		return new CountTrigger<>(maxCount);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
deleted file mode 100644
index 1c6523d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.Serializable;
-
-/**
- * A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold.
- *
- * <p>
- * This trigger calculates a delta between the data point which triggered last
- * and the currently arrived data point. It triggers if the delta is higher than
- * a specified threshold.
- *
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class DeltaTrigger<T extends Serializable, W extends Window> implements Trigger<T, W> {
-	private static final long serialVersionUID = 1L;
-
-	private final DeltaFunction<T> deltaFunction;
-	private final double threshold;
-
-	private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction) {
-		this.deltaFunction = deltaFunction;
-		this.threshold = threshold;
-	}
-
-	@Override
-	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
-		OperatorState<T> lastElementState = ctx.getKeyValueState("last-element", null);
-		if (lastElementState.value() == null) {
-			lastElementState.update(element);
-			return TriggerResult.CONTINUE;
-		}
-		if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
-			lastElementState.update(element);
-			return TriggerResult.FIRE;
-		}
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public String toString() {
-		return "DeltaTrigger(" +  deltaFunction + ", " + threshold + ")";
-	}
-
-	/**
-	 * Creates a delta trigger from the given threshold and {@code DeltaFunction}.
-	 *
-	 * @param threshold The threshold at which to trigger.
-	 * @param deltaFunction The delta function to use
-	 *
-	 * @param <T> The type of elements on which this trigger can operate.
-	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
-	 */
-	public static <T extends Serializable, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
-		return new DeltaTrigger<>(threshold, deltaFunction);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
deleted file mode 100644
index 4b6613c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-/**
- * A {@link Trigger} that fires once the watermark passes the end of the window
- * to which a pane belongs.
- *
- * @see org.apache.flink.streaming.api.watermark.Watermark
- */
-public class EventTimeTrigger implements Trigger<Object, TimeWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private EventTimeTrigger() {}
-
-	@Override
-	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
-		ctx.registerEventTimeTimer(window.maxTimestamp());
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) {
-		return TriggerResult.FIRE_AND_PURGE;
-	}
-
-	@Override
-	public TriggerResult onProcessingTime(long time,
-			TriggerContext ctx) throws Exception {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public String toString() {
-		return "EventTimeTrigger()";
-	}
-
-	/**
-	 * Creates trigger that fires once the watermark passes the end of the window.
-	 */
-	public static EventTimeTrigger create() {
-		return new EventTimeTrigger();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
deleted file mode 100644
index 6278ba6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-/**
- * A {@link Trigger} that fires once the current system time passes the end of the window
- * to which a pane belongs.
- */
-public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private ProcessingTimeTrigger() {}
-
-	@Override
-	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
-		ctx.registerProcessingTimeTimer(window.maxTimestamp());
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onEventTime(long time,
-			TriggerContext ctx) throws Exception {
-		return TriggerResult.CONTINUE;
-	}
-
-	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) {
-		return TriggerResult.FIRE_AND_PURGE;
-	}
-
-	@Override
-	public String toString() {
-		return "ProcessingTimeTrigger()";
-	}
-
-	/**
-	 * Creates a new trigger that fires once system time passes the end of the window.
-	 */
-	public static ProcessingTimeTrigger create() {
-		return new ProcessingTimeTrigger();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
deleted file mode 100644
index eaca336..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-/**
- * A trigger that can turn any {@link Trigger} into a purging {@code Trigger}.
- *
- * <p>
- * When the nested trigger fires, this will return a {@code FIRE_AND_PURGE}
- * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerResult}
- *
- * @param <T> The type of elements on which this trigger can operate.
- * @param <W> The type of {@link Window Windows} on which this trigger can operate.
- */
-public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
-	private static final long serialVersionUID = 1L;
-
-	private Trigger<T, W> nestedTrigger;
-
-	private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
-		this.nestedTrigger = nestedTrigger;
-	}
-
-	@Override
-	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
-		switch (triggerResult) {
-			case FIRE:
-				return TriggerResult.FIRE_AND_PURGE;
-			case FIRE_AND_PURGE:
-				return TriggerResult.FIRE_AND_PURGE;
-			default:
-				return TriggerResult.CONTINUE;
-		}
-	}
-
-	@Override
-	public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onEventTime(time, ctx);
-		switch (triggerResult) {
-			case FIRE:
-				return TriggerResult.FIRE_AND_PURGE;
-			case FIRE_AND_PURGE:
-				return TriggerResult.FIRE_AND_PURGE;
-			default:
-				return TriggerResult.CONTINUE;
-		}
-	}
-
-	@Override
-	public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
-		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, ctx);
-		switch (triggerResult) {
-			case FIRE:
-				return TriggerResult.FIRE_AND_PURGE;
-			case FIRE_AND_PURGE:
-				return TriggerResult.FIRE_AND_PURGE;
-			default:
-				return TriggerResult.CONTINUE;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
-	}
-
-	/**
-	 * Creates a new purging trigger from the given {@code Trigger}.
-	 *
-	 * @param nestedTrigger The trigger that is wrapped by this purging trigger
-	 */
-	public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
-		return new PurgingTrigger<>(nestedTrigger);
-	}
-
-	@VisibleForTesting
-	public Trigger<T, W> getNestedTrigger() {
-		return nestedTrigger;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
deleted file mode 100644
index ef8110b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.triggers;
-
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-
-import java.io.Serializable;
-
-/**
- * A {@code Trigger} determines when a pane of a window should be evaluated to emit the
- * results for that part of the window.
- *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
- * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
- * be in multiple panes of it was assigned to multiple windows by the
- * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
- * have their own instance of the {@code Trigger}.
- *
- * <p>
- * Triggers must not maintain state internally since they can be re-created or reused for
- * different keys. All necessary state should be persisted using the state abstraction
- * available on the {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext}.
- *
- * @param <T> The type of elements on which this {@code Trigger} works.
- * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
- */
-public interface Trigger<T, W extends Window> extends Serializable {
-
-	/**
-	 * Called for every element that gets added to a pane. The result of this will determine
-	 * whether the pane is evaluated to emit results.
-	 *
-	 * @param element The element that arrived.
-	 * @param timestamp The timestamp of the element that arrived.
-	 * @param window The window to which this pane belongs.
-	 * @param ctx A context object that can be used to register timer callbacks.
-	 */
-	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
-
-	/**
-	 * Called when a processing-time timer that was set using the trigger context fires.
-	 *
-	 * @param time The timestamp at which the timer fired.
-	 * @param ctx A context object that can be used to register timer callbacks.
-	 */
-	TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception;
-
-	/**
-	 * Called when an event-time timer that was set using the trigger context fires.
-	 *
-	 * @param time The timestamp at which the timer fired.
-	 * @param ctx A context object that can be used to register timer callbacks.
-	 */
-	TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception;
-
-
-	/**
-	 * Result type for trigger methods. This determines what happens which the window.
-	 *
-	 * <p>
-	 * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window
-	 * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane
-	 * are purged. On {@code CONTINUE} nothing happens, processing continues.
-	 */
-	enum TriggerResult {
-		CONTINUE, FIRE_AND_PURGE, FIRE
-	}
-
-	/**
-	 * A context object that is given to {@code Trigger} methods to allow them to register timer
-	 * callbacks and deal with state.
-	 */
-	interface TriggerContext {
-
-		/**
-		 * Register a system time callback. When the current system time passes the specified
-		 * time {@link #onProcessingTime(long, TriggerContext)} is called with the time specified here.
-		 *
-		 * @param time The time at which to invoke {@link #onProcessingTime(long, TriggerContext)}
-		 */
-		void registerProcessingTimeTimer(long time);
-
-		/**
-		 * Register an event-time callback. When the current watermark passes the specified
-		 * time {@link #onEventTime(long, TriggerContext)} is called with the time specified here.
-		 *
-		 * @see org.apache.flink.streaming.api.watermark.Watermark
-		 *
-		 * @param time The watermark at which to invoke {@link #onEventTime(long, TriggerContext)}
-		 */
-		void registerEventTimeTimer(long time);
-
-		/**
-		 * Retrieves an {@link OperatorState} object that can be used to interact with
-		 * fault-tolerant state that is scoped to the window and key of the current
-		 * trigger invocation.
-		 *
-		 * @param name A unique key for the state.
-		 * @param defaultState The default value of the state.
-		 */
-		<S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
deleted file mode 100644
index f20c779..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.windows;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-
-public class GlobalWindow extends Window {
-
-	private static GlobalWindow INSTANCE = new GlobalWindow();
-
-	private GlobalWindow() { }
-
-	public static GlobalWindow get() {
-		return INSTANCE;
-	}
-
-	@Override
-	public long maxTimestamp() {
-		return Long.MAX_VALUE;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		return this == o || !(o == null || getClass() != o.getClass());
-	}
-
-	@Override
-	public int hashCode() {
-		return 0;
-	}
-
-	@Override
-	public String toString() {
-		return "GlobalWindow";
-	}
-
-	public static class Serializer extends TypeSerializer<GlobalWindow> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean isImmutableType() {
-			return true;
-		}
-
-		@Override
-		public TypeSerializer<GlobalWindow> duplicate() {
-			return this;
-		}
-
-		@Override
-		public GlobalWindow createInstance() {
-			return GlobalWindow.INSTANCE;
-		}
-
-		@Override
-		public GlobalWindow copy(GlobalWindow from) {
-			return from;
-		}
-
-		@Override
-		public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) {
-			return from;
-		}
-
-		@Override
-		public int getLength() {
-			return 0;
-		}
-
-		@Override
-		public void serialize(GlobalWindow record, DataOutputView target) throws IOException {
-			target.writeByte(0);
-		}
-
-		@Override
-		public GlobalWindow deserialize(DataInputView source) throws IOException {
-			source.readByte();
-			return GlobalWindow.INSTANCE;
-		}
-
-		@Override
-		public GlobalWindow deserialize(GlobalWindow reuse,
-				DataInputView source) throws IOException {
-			source.readByte();
-			return GlobalWindow.INSTANCE;
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			source.readByte();
-			target.writeByte(0);
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof Serializer;
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof Serializer;
-		}
-
-		@Override
-		public int hashCode() {
-			return 0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
deleted file mode 100644
index 0c4c2a8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.windows;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-
-/**
- * A {@link Window} that represents a time interval from {@code start} (inclusive) to
- * {@code start + size} (exclusive).
- */
-public class TimeWindow extends Window {
-
-	private final long start;
-	private final long end;
-
-	public TimeWindow(long start, long end) {
-		this.start = start;
-		this.end = end;
-	}
-
-	public long getStart() {
-		return start;
-	}
-
-	public long getEnd() {
-		return end;
-	}
-
-	@Override
-	public long maxTimestamp() {
-		return end - 1;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		TimeWindow window = (TimeWindow) o;
-
-		return end == window.end && start == window.start;
-	}
-
-	@Override
-	public int hashCode() {
-		int result = (int) (start ^ (start >>> 32));
-		result = 31 * result + (int) (end ^ (end >>> 32));
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "TimeWindow{" +
-				"start=" + start +
-				", end=" + end +
-				'}';
-	}
-
-	public static class Serializer extends TypeSerializer<TimeWindow> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean isImmutableType() {
-			return true;
-		}
-
-		@Override
-		public TypeSerializer<TimeWindow> duplicate() {
-			return this;
-		}
-
-		@Override
-		public TimeWindow createInstance() {
-			return null;
-		}
-
-		@Override
-		public TimeWindow copy(TimeWindow from) {
-			return from;
-		}
-
-		@Override
-		public TimeWindow copy(TimeWindow from, TimeWindow reuse) {
-			return from;
-		}
-
-		@Override
-		public int getLength() {
-			return 0;
-		}
-
-		@Override
-		public void serialize(TimeWindow record, DataOutputView target) throws IOException {
-			target.writeLong(record.start);
-			target.writeLong(record.end);
-		}
-
-		@Override
-		public TimeWindow deserialize(DataInputView source) throws IOException {
-			long start = source.readLong();
-			long end = source.readLong();
-			return new TimeWindow(start, end);
-		}
-
-		@Override
-		public TimeWindow deserialize(TimeWindow reuse, DataInputView source) throws IOException {
-			long start = source.readLong();
-			long end = source.readLong();
-			return new TimeWindow(start, end);
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			target.writeLong(source.readLong());
-			target.writeLong(source.readLong());
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			return obj instanceof Serializer;
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof Serializer;
-		}
-
-		@Override
-		public int hashCode() {
-			return 0;
-		}
-	}
-
-}