You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:45 UTC

[09/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and API

[FLINK-2780] Remove Old Windowing Logic and API

This rewrites the few examples and tests that are remaining using the
new Windowing API.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c1141ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c1141ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c1141ab

Branch: refs/heads/master
Commit: 0c1141abcd0f58dcf2ec9abcccd47c5e5410b074
Parents: b2b2781
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Oct 4 10:33:06 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 9 11:15:59 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  89 --
 .../api/datastream/DiscretizedStream.java       | 341 --------
 .../api/datastream/WindowedDataStream.java      | 867 -------------------
 .../api/functions/RichWindowMapFunction.java    |  40 -
 .../api/functions/WindowMapFunction.java        |  35 -
 .../windowing/delta/CosineDistance.java         |   2 +-
 .../windowing/delta/EuclideanDistance.java      |   2 +-
 .../delta/ExtractionAwareDeltaFunction.java     |   2 +-
 .../delta/extractor/ArrayFromTuple.java         |  74 ++
 .../delta/extractor/ConcatenatedExtract.java    |  68 ++
 .../windowing/delta/extractor/Extractor.java    |  43 +
 .../delta/extractor/FieldFromArray.java         |  59 ++
 .../delta/extractor/FieldFromTuple.java         |  58 ++
 .../delta/extractor/FieldsFromArray.java        |  67 ++
 .../delta/extractor/FieldsFromTuple.java        |  53 ++
 .../flink/streaming/api/graph/StreamGraph.java  |   4 -
 .../streaming/api/graph/WindowingOptimizer.java | 161 ----
 .../operators/windowing/EmptyWindowFilter.java  |  32 -
 .../windowing/GroupedActiveDiscretizer.java     | 129 ---
 .../windowing/GroupedStreamDiscretizer.java     | 132 ---
 .../windowing/GroupedWindowBuffer.java          |  69 --
 .../windowing/ParallelGroupedMerge.java         |  41 -
 .../api/operators/windowing/ParallelMerge.java  | 145 ----
 .../windowing/ParallelMergeOperator.java        |  43 -
 .../operators/windowing/StreamDiscretizer.java  | 237 -----
 .../operators/windowing/StreamWindowBuffer.java |  69 --
 .../operators/windowing/WindowFlattener.java    |  51 --
 .../api/operators/windowing/WindowFolder.java   | 100 ---
 .../api/operators/windowing/WindowMapper.java   |  94 --
 .../api/operators/windowing/WindowMerger.java   |  73 --
 .../windowing/WindowPartExtractor.java          |  55 --
 .../operators/windowing/WindowPartitioner.java  |  79 --
 .../api/operators/windowing/WindowReducer.java  |  99 ---
 .../streaming/api/windowing/StreamWindow.java   | 276 ------
 .../api/windowing/StreamWindowSerializer.java   | 148 ----
 .../api/windowing/StreamWindowTypeInfo.java     | 104 ---
 .../streaming/api/windowing/WindowEvent.java    |  71 --
 .../streaming/api/windowing/WindowUtils.java    | 203 -----
 .../api/windowing/evictors/TimeEvictor.java     |   3 +-
 .../api/windowing/extractor/ArrayFromTuple.java |  74 --
 .../extractor/ConcatenatedExtract.java          |  68 --
 .../api/windowing/extractor/Extractor.java      |  43 -
 .../api/windowing/extractor/FieldFromArray.java |  59 --
 .../api/windowing/extractor/FieldFromTuple.java |  58 --
 .../windowing/extractor/FieldsFromArray.java    |  67 --
 .../windowing/extractor/FieldsFromTuple.java    |  53 --
 .../streaming/api/windowing/helper/Count.java   | 101 ---
 .../streaming/api/windowing/helper/Delta.java   | 105 ---
 .../api/windowing/helper/FullStream.java        |  57 --
 .../api/windowing/helper/SystemTimestamp.java   |  37 -
 .../streaming/api/windowing/helper/Time.java    | 153 ----
 .../api/windowing/helper/Timestamp.java         |  39 -
 .../api/windowing/helper/TimestampWrapper.java  |  65 --
 .../api/windowing/helper/WindowingHelper.java   |  61 --
 .../ActiveCloneableEvictionPolicyWrapper.java   |  62 --
 .../windowing/policy/ActiveEvictionPolicy.java  |  52 --
 .../policy/ActiveEvictionPolicyWrapper.java     |  64 --
 .../windowing/policy/ActiveTriggerCallback.java |  45 -
 .../windowing/policy/ActiveTriggerPolicy.java   |  81 --
 .../windowing/policy/CentralActiveTrigger.java  |  45 -
 .../policy/CloneableEvictionPolicy.java         |  44 -
 .../policy/CloneableMultiEvictionPolicy.java    |  84 --
 .../policy/CloneableMultiTriggerPolicy.java     |  63 --
 .../policy/CloneableTriggerPolicy.java          |  44 -
 .../windowing/policy/CountEvictionPolicy.java   | 155 ----
 .../windowing/policy/CountTriggerPolicy.java    | 117 ---
 .../api/windowing/policy/DeltaPolicy.java       | 167 ----
 .../api/windowing/policy/EvictionPolicy.java    |  47 -
 .../windowing/policy/KeepAllEvictionPolicy.java |  29 -
 .../windowing/policy/MultiEvictionPolicy.java   | 170 ----
 .../windowing/policy/MultiTriggerPolicy.java    | 123 ---
 .../api/windowing/policy/PunctuationPolicy.java | 147 ----
 .../windowing/policy/TimeEvictionPolicy.java    | 167 ----
 .../api/windowing/policy/TimeTriggerPolicy.java | 209 -----
 .../api/windowing/policy/TriggerPolicy.java     |  54 --
 .../policy/TumblingEvictionPolicy.java          | 104 ---
 .../windowbuffer/BasicWindowBuffer.java         |  73 --
 .../JumpingCountGroupedPreReducer.java          |  54 --
 .../windowbuffer/JumpingCountPreReducer.java    |  56 --
 .../JumpingTimeGroupedPreReducer.java           |  56 --
 .../windowbuffer/JumpingTimePreReducer.java     |  58 --
 .../windowing/windowbuffer/PreAggregator.java   |  27 -
 .../SlidingCountGroupedPreReducer.java          |  83 --
 .../windowbuffer/SlidingCountPreReducer.java    |  81 --
 .../windowbuffer/SlidingGroupedPreReducer.java  | 153 ----
 .../windowbuffer/SlidingPreReducer.java         | 175 ----
 .../SlidingTimeGroupedPreReducer.java           | 102 ---
 .../windowbuffer/SlidingTimePreReducer.java     | 102 ---
 .../windowbuffer/TumblingGroupedPreReducer.java | 107 ---
 .../windowbuffer/TumblingPreReducer.java        |  98 ---
 .../windowing/windowbuffer/WindowBuffer.java    |  61 --
 .../flink/streaming/api/DataStreamTest.java     | 125 +--
 .../api/complex/ComplexIntegrationTest.java     | 198 +++--
 .../delta/extractor/ArrayFromTupleTest.java     | 118 +++
 .../extractor/ConcatenatedExtractTest.java      |  77 ++
 .../delta/extractor/FieldFromArrayTest.java     |  54 ++
 .../delta/extractor/FieldFromTupleTest.java     |  83 ++
 .../delta/extractor/FieldsFromArrayTest.java    | 108 +++
 .../delta/extractor/FieldsFromTupleTest.java    | 106 +++
 .../api/operators/co/SelfConnectionTest.java    |  18 -
 .../windowing/GroupedStreamDiscretizerTest.java | 104 ---
 .../windowing/ParallelMergeITCase.java          | 101 ---
 .../operators/windowing/ParallelMergeTest.java  | 122 ---
 .../windowing/StreamDiscretizerTest.java        | 117 ---
 .../windowing/WindowFlattenerTest.java          |  53 --
 .../operators/windowing/WindowFolderTest.java   |  61 --
 .../operators/windowing/WindowMapperTest.java   |  60 --
 .../operators/windowing/WindowMergerTest.java   |  75 --
 .../windowing/WindowPartitionerTest.java        |  75 --
 .../operators/windowing/WindowReducerTest.java  |  61 --
 .../operators/windowing/WindowingITCase.java    | 529 -----------
 .../api/windowing/StreamWindowTest.java         | 201 -----
 .../api/windowing/StreamWindowTypeInfoTest.java |  51 --
 .../windowing/extractor/ArrayFromTupleTest.java | 118 ---
 .../extractor/ConcatenatedExtractTest.java      |  77 --
 .../windowing/extractor/FieldFromArrayTest.java |  55 --
 .../windowing/extractor/FieldFromTupleTest.java |  84 --
 .../extractor/FieldsFromArrayTest.java          | 108 ---
 .../extractor/FieldsFromTupleTest.java          | 106 ---
 .../policy/CountEvictionPolicyTest.java         | 136 ---
 .../policy/CountTriggerPolicyTest.java          | 109 ---
 .../api/windowing/policy/DeltaPolicyTest.java   |  88 --
 .../policy/MultiEvictionPolicyTest.java         | 186 ----
 .../policy/MultiTriggerPolicyTest.java          | 245 ------
 .../windowing/policy/PunctuationPolicyTest.java | 155 ----
 .../policy/TimeEvictionPolicyTest.java          | 164 ----
 .../windowing/policy/TimeTriggerPolicyTest.java | 156 ----
 .../policy/TumblingEvictionPolicyTest.java      |  43 -
 .../windowbuffer/BasicWindowBufferTest.java     |  86 --
 .../JumpingCountGroupedPreReducerTest.java      | 157 ----
 .../JumpingCountPreReducerTest.java             | 107 ---
 .../windowbuffer/JumpingTimePreReducerTest.java |  96 --
 .../SlidingCountGroupedPreReducerTest.java      | 235 -----
 .../SlidingCountPreReducerTest.java             | 216 -----
 .../SlidingTimeGroupedPreReducerTest.java       | 387 ---------
 .../windowbuffer/SlidingTimePreReducerTest.java | 324 -------
 .../TumblingGroupedPreReducerTest.java          | 151 ----
 .../windowbuffer/TumblingPreReducerTest.java    | 104 ---
 .../ml/IncrementalLearningSkeleton.java         |  48 +-
 .../util/IncrementalLearningSkeletonData.java   |  16 +-
 .../examples/windowing/SessionWindowing.java    |  84 +-
 .../examples/windowing/TopSpeedWindowing.java   |  49 +-
 .../examples/windowing/WindowWordCount.java     |  13 +-
 .../util/TopSpeedWindowingExampleData.java      |  96 +-
 .../examples/windowing/TopSpeedWindowing.scala  |  28 +-
 .../streaming/api/scala/AllWindowedStream.scala |  25 +
 .../flink/streaming/api/scala/DataStream.scala  |  36 +-
 .../api/scala/WindowedDataStream.scala          | 338 --------
 .../streaming/api/scala/WindowedStream.scala    |  25 +
 .../flink/streaming/api/scala/package.scala     |   7 +-
 .../streaming/api/scala/windowing/Delta.scala   |  46 -
 .../streaming/api/scala/windowing/Time.scala    |  53 --
 .../streaming/api/scala/DataStreamTest.scala    |  51 +-
 .../StreamingScalaAPICompletenessTest.scala     |   7 +-
 154 files changed, 1440 insertions(+), 14289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ee8b3d2..80e0e47 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -62,13 +62,6 @@ import org.apache.flink.streaming.api.transformations.UnionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.FullStream;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -630,88 +623,6 @@ public class DataStream<T> {
 	}
 
 	/**
-	 * Create a {@link WindowedDataStream} that can be used to apply
-	 * transformation like {@link WindowedDataStream#reduceWindow},
-	 * {@link WindowedDataStream#mapWindow} or aggregations on preset
-	 * chunks(windows) of the data stream. To define windows a
-	 * {@link WindowingHelper} such as {@link Time}, {@link Count},
-	 * {@link Delta} and {@link FullStream} can be used.
-	 *
-	 * <p>
-	 * When applied to a grouped data stream, the windows (evictions) and slide sizes
-	 * (triggers) will be computed on a per group basis.
-	 *
-	 * <p>
-	 * For more advanced control over the trigger and eviction policies please refer to
-	 * {@link #window(TriggerPolicy, EvictionPolicy)}
-	 *
-	 * <p>
-	 * For example, to create a sum every 5 seconds in a tumbling fashion:
-	 *
-	 * <pre>
-	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)}
-	 * </pre>
-	 *
-	 * <p>
-	 * To create sliding windows use the
-	 * {@link WindowedDataStream#every(WindowingHelper)}, for example with 3 second slides:</br>
-	 *
-	 * <pre>
-	 * 
-	 * {@code
-	 * ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3, TimeUnit.SECONDS)).sum(field)
-	 * }
-	 *
-	 * </pre>
-	 * 
-	 * @param policyHelper
-	 *            Any {@link WindowingHelper} such as {@link Time},
-	 *            {@link Count}, {@link Delta} {@link FullStream} to define the
-	 *            window size.
-	 *
-	 * @return A {@link WindowedDataStream} providing further operations.
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public WindowedDataStream<T> window(WindowingHelper policyHelper) {
-		policyHelper.setExecutionConfig(getExecutionConfig());
-		return new WindowedDataStream<T>(this, policyHelper);
-	}
-
-	/**
-	 * Create a {@link WindowedDataStream} using the given {@link TriggerPolicy}
-	 * and {@link EvictionPolicy}. Windowing can be used to apply transformation
-	 * like {@link WindowedDataStream#reduceWindow},
-	 * {@link WindowedDataStream#mapWindow} or aggregations on preset
-	 * chunks(windows) of the data stream.
-	 *
-	 * <p>
-	 * For most common use-cases please refer to {@link #window(WindowingHelper)}
-	 * 
-	 * @param trigger
-	 *            The {@link TriggerPolicy} that will determine how often the
-	 *            user function is called on the window.
-	 * @param eviction
-	 *            The {@link EvictionPolicy} that will determine the number of
-	 *            elements in each time window.
-	 * @return A {@link WindowedDataStream} providing further operations.
-	 */
-	public WindowedDataStream<T> window(TriggerPolicy<T> trigger, EvictionPolicy<T> eviction) {
-		return new WindowedDataStream<T>(this, trigger, eviction);
-	}
-
-	/**
-	 * Create a {@link WindowedDataStream} on the full stream history, to
-	 * produce periodic aggregates.
-	 * 
-	 * @return A {@link WindowedDataStream} providing further operations.
-	 */
-	@SuppressWarnings("rawtypes")
-	public WindowedDataStream<T> every(WindowingHelper policyHelper) {
-		policyHelper.setExecutionConfig(getExecutionConfig());
-		return window(FullStream.window()).every(policyHelper);
-	}
-
-	/**
 	 * Windows this {@code DataStream} into tumbling time windows.
 	 *
 	 * <p>

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
deleted file mode 100644
index 18c2cee..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamFilter;
-import org.apache.flink.streaming.api.operators.StreamFlatMap;
-import org.apache.flink.streaming.api.operators.windowing.EmptyWindowFilter;
-import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
-import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
-import org.apache.flink.streaming.api.operators.windowing.ParallelMergeOperator;
-import org.apache.flink.streaming.api.operators.windowing.WindowFlattener;
-import org.apache.flink.streaming.api.operators.windowing.WindowFolder;
-import org.apache.flink.streaming.api.operators.windowing.WindowMapper;
-import org.apache.flink.streaming.api.operators.windowing.WindowMerger;
-import org.apache.flink.streaming.api.operators.windowing.WindowPartExtractor;
-import org.apache.flink.streaming.api.operators.windowing.WindowPartitioner;
-import org.apache.flink.streaming.api.operators.windowing.WindowReducer;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
-import org.apache.flink.streaming.api.windowing.WindowUtils.WindowKey;
-import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
-
-/**
- * A {@link DiscretizedStream} represents a data stream that has been divided
- * into windows (predefined chunks). User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)},
- * or aggregations can be applied to the windows.
- * 
- * @param <OUT>
- *            The output type of the {@link DiscretizedStream}
- */
-public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
-
-	private SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream;
-	private WindowTransformation transformation;
-	protected boolean isPartitioned = false;
-
-	protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream,
-			KeySelector<OUT, ?> keyByKey, WindowTransformation tranformation,
-			boolean isPartitioned) {
-		super();
-		this.keyByKey = keyByKey;
-		this.discretizedStream = discretizedStream;
-		this.transformation = tranformation;
-		this.isPartitioned = isPartitioned;
-	}
-
-	/**
-	 * Gets the name of the current data stream. This name is
-	 * used by the visualization and logging during runtime.
-	 *
-	 * @return Name of the stream.
-	 */
-	public String getName(){
-		return discretizedStream.getName();
-	}
-
-	/**
-	 * Sets the name of the current data stream. This name is
-	 * used by the visualization and logging during runtime.
-	 *
-	 * @return The named operator.
-	 */
-	public DiscretizedStream<OUT> name(String name){
-		discretizedStream.name(name);
-		return this;
-	}
-
-	public DataStream<OUT> flatten() {
-		return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>()).setParallelism(discretizedStream.getParallelism());
-	}
-
-	public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
-		return discretizedStream;
-	}
-
-	@Override
-	public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
-
-		DiscretizedStream<OUT> out = partition(transformation).transform(
-				WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(),
-				new WindowReducer<OUT>(reduceFunction)).merge();
-
-		// If we merged a non-grouped reduce transformation we need to reduce
-		// again
-		if (!isGrouped() && ((OneInputTransformation<?, ?>)out.discretizedStream.getTransformation()).getOperator() instanceof WindowMerger) {
-			return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
-					new WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
-		} else {
-			return out;
-		}
-	}
-
-	/**
-	 * This method implements the parallel time reduce logic for time windows
-	 * 
-	 * @param reduceFunction
-	 *            The reduce function to be applied on the windows
-	 * @return The reduced DataStream
-	 */
-	protected DiscretizedStream<OUT> timeReduce(ReduceFunction<OUT> reduceFunction) {
-
-		// Since we also emit the empty windows for bookkeeping, we need to
-		// filter them out
-		DiscretizedStream<OUT> nonEmpty = filterEmpty(this);
-
-		// We extract the number of parts from each window we will merge using
-		// this afterwards
-		DataStream<Tuple2<Integer, Integer>> numOfParts = extractPartsByID(this);
-
-		// We merge the windows by the number of parts
-		return wrap(parallelMerge(numOfParts, nonEmpty, reduceFunction), false);
-
-	}
-
-	private SingleOutputStreamOperator<StreamWindow<OUT>, ?> parallelMerge(
-			DataStream<Tuple2<Integer, Integer>> numOfParts, DiscretizedStream<OUT> reduced,
-			ReduceFunction<OUT> reduceFunction) {
-
-		ParallelMerge<OUT> parallelMerger = isGrouped() ? new ParallelGroupedMerge<OUT>()
-				: new ParallelMerge<OUT>(reduceFunction);
-
-		return reduced.discretizedStream
-				.keyBy(new WindowKey<OUT>())
-				.connect(numOfParts.keyBy(0))
-				.transform(
-						"CoFlatMap",
-						reduced.discretizedStream.getType(),
-						new ParallelMergeOperator<OUT>(parallelMerger));
-	}
-
-	@Override
-	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
-
-		TypeInformation<R> retType = getWindowMapReturnTypes(windowMapFunction, getType());
-
-		return mapWindow(windowMapFunction, retType);
-	}
-
-	@Override
-	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction,
-			TypeInformation<R> returnType) {
-		DiscretizedStream<R> out = partition(transformation).transform(
-				WindowTransformation.MAPWINDOW, "Window Map", returnType,
-				new WindowMapper<OUT, R>(discretizedStream.clean(windowMapFunction))).setParallelism(discretizedStream.getParallelism()).merge();
-
-		return out;
-	}
-
-	@Override
-	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<OUT, R> foldFunction,
-			TypeInformation<R> outType) {
-
-		DiscretizedStream<R> out = partition(transformation).transform(
-				WindowTransformation.FOLDWINDOW, "Fold Window", outType,
-				new WindowFolder<OUT, R>(discretizedStream.clean(foldFunction), initialValue))
-				.setParallelism(discretizedStream.getParallelism())
-				.merge();
-		return out;
-	}
-
-	private <R> DiscretizedStream<R> transform(WindowTransformation transformation,
-			String operatorName, TypeInformation<R> retType,
-			OneInputStreamOperator<StreamWindow<OUT>, StreamWindow<R>> operator) {
-
-		return wrap(discretizedStream.transform(operatorName, new StreamWindowTypeInfo<R>(retType),
-				operator).setParallelism(discretizedStream.getParallelism()), transformation);
-	}
-
-	private DiscretizedStream<OUT> filterEmpty(DiscretizedStream<OUT> input) {
-		StreamFilter<StreamWindow<OUT>> emptyFilter = new StreamFilter<StreamWindow<OUT>>(new EmptyWindowFilter<OUT>());
-		emptyFilter.disableInputCopy();
-		return wrap(input.discretizedStream.transform("Filter", input.discretizedStream.getType(), emptyFilter), input.isPartitioned);
-	}
-
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	private DataStream<Tuple2<Integer, Integer>> extractPartsByID(DiscretizedStream<OUT> input) {
-		StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>> partExtractor = new StreamFlatMap<StreamWindow<OUT>, Tuple2<Integer, Integer>>(
-				new WindowPartExtractor<OUT>());
-		partExtractor.disableInputCopy();
-		return input.discretizedStream.transform("ExtractParts", new TupleTypeInfo(Tuple2.class,
-				BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), partExtractor);
-	}
-
-	private DiscretizedStream<OUT> partition(WindowTransformation transformation) {
-
-		int parallelism = discretizedStream.getParallelism();
-
-		if (isGrouped()) {
-			DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
-					new WindowPartitioner<OUT>(keyByKey)).setParallelism(parallelism);
-
-			out.keyByKey = null;
-			out.isPartitioned = true;
-
-			return out;
-		} else if (transformation == WindowTransformation.REDUCEWINDOW
-				&& parallelism != discretizedStream.getExecutionEnvironment().getParallelism()) {
-			DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(),
-					new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
-
-			out.isPartitioned = true;
-
-			return out;
-		} else {
-			return this;
-		}
-	}
-
-	private DiscretizedStream<OUT> setParallelism(int parallelism) {
-		return wrap(discretizedStream.setParallelism(parallelism), isPartitioned);
-	}
-
-	private DiscretizedStream<OUT> merge() {
-		TypeInformation<StreamWindow<OUT>> type = discretizedStream.getType();
-
-		// Only merge partitioned streams
-		if (isPartitioned) {
-			return wrap(
-					discretizedStream.keyBy(new WindowKey<OUT>()).transform("Window Merger",
-							type, new WindowMerger<OUT>()).setParallelism(discretizedStream.getParallelism()), false);
-		} else {
-			return this;
-		}
-
-	}
-
-	@SuppressWarnings("unchecked")
-	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
-			boolean isPartitioned) {
-		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
-				transformation, isPartitioned);
-	}
-
-	@SuppressWarnings("unchecked")
-	private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator<StreamWindow<R>, ?> stream,
-			WindowTransformation transformation) {
-		return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.keyByKey,
-				transformation, isPartitioned);
-	}
-
-	@SuppressWarnings("rawtypes")
-	protected Class<?> getClassAtPos(int pos) {
-		Class<?> type;
-		TypeInformation<OUT> outTypeInfo = getType();
-		if (outTypeInfo.isTupleType()) {
-			type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
-
-		} else if (outTypeInfo instanceof BasicArrayTypeInfo) {
-
-			type = ((BasicArrayTypeInfo) outTypeInfo).getComponentTypeClass();
-
-		} else if (outTypeInfo instanceof PrimitiveArrayTypeInfo) {
-			Class<?> clazz = outTypeInfo.getTypeClass();
-			if (clazz == boolean[].class) {
-				type = Boolean.class;
-			} else if (clazz == short[].class) {
-				type = Short.class;
-			} else if (clazz == int[].class) {
-				type = Integer.class;
-			} else if (clazz == long[].class) {
-				type = Long.class;
-			} else if (clazz == float[].class) {
-				type = Float.class;
-			} else if (clazz == double[].class) {
-				type = Double.class;
-			} else if (clazz == char[].class) {
-				type = Character.class;
-			} else {
-				throw new IndexOutOfBoundsException("Type could not be determined for array");
-			}
-
-		} else if (pos == 0) {
-			type = outTypeInfo.getTypeClass();
-		} else {
-			throw new IndexOutOfBoundsException("Position is out of range");
-		}
-		return type;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return discretizedStream.getExecutionConfig();
-	}
-
-	/**
-	 * Gets the output type.
-	 * 
-	 * @return The output type.
-	 */
-	public TypeInformation<OUT> getType() {
-		return ((StreamWindowTypeInfo<OUT>) discretizedStream.getType()).getInnerType();
-	}
-
-	private static <IN, OUT> TypeInformation<OUT> getWindowMapReturnTypes(
-			WindowMapFunction<IN, OUT> windowMapInterface, TypeInformation<IN> inType) {
-		return TypeExtractor.getUnaryOperatorReturnType((Function) windowMapInterface,
-				WindowMapFunction.class, true, true, inType, null, false);
-	}
-
-	protected DiscretizedStream<OUT> copy() {
-		return new DiscretizedStream<OUT>(discretizedStream, keyByKey, transformation, isPartitioned);
-	}
-
-	@Override
-	public WindowedDataStream<OUT> local() {
-		throw new UnsupportedOperationException(
-				"Local discretisation can only be applied after defining the discretisation logic");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
deleted file mode 100644
index c1c5f6d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ /dev/null
@@ -1,867 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichFoldFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.functions.RichWindowMapFunction;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.windowing.GroupedActiveDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.GroupedStreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.GroupedWindowBuffer;
-import org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer;
-import org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;
-import org.apache.flink.streaming.api.windowing.WindowEvent;
-import org.apache.flink.streaming.api.windowing.WindowUtils;
-import org.apache.flink.streaming.api.windowing.WindowUtils.WindowTransformation;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.KeepAllEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingCountPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimeGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.JumpingTimePreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.PreAggregator;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-
-/**
- * A {@link WindowedDataStream} represents a data stream that has been
- * discretised into windows. User defined function such as
- * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow(WindowMapFunction)} or aggregations
- * can be applied to the windows. The results of these transformations are also
- * WindowedDataStreams of the same discretisation unit.
- * 
- * @param <T> The output type of the {@link WindowedDataStream}
- */
-public class WindowedDataStream<T> {
-
-	protected DataStream<T> dataStream;
-
-	protected boolean isLocal = false;
-
-	protected KeySelector<T, ?> discretizerKey;
-	protected KeySelector<T, ?> keyByKey;
-
-	protected WindowingHelper<T> triggerHelper;
-	protected WindowingHelper<T> evictionHelper;
-
-	protected TriggerPolicy<T> userTrigger;
-	protected EvictionPolicy<T> userEvicter;
-
-	protected WindowedDataStream(DataStream<T> dataStream, WindowingHelper<T> policyHelper) {
-		this.dataStream = dataStream;
-		this.triggerHelper = policyHelper;
-
-		if (dataStream instanceof KeyedStream) {
-			this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
-		}
-	}
-
-	protected WindowedDataStream(DataStream<T> dataStream, TriggerPolicy<T> trigger,
-			EvictionPolicy<T> evicter) {
-		this.dataStream = dataStream;
-
-		this.userTrigger = trigger;
-		this.userEvicter = evicter;
-
-		if (dataStream instanceof KeyedStream) {
-			this.discretizerKey = ((KeyedStream<T, ?>) dataStream).keySelector;
-		}
-	}
-
-	protected WindowedDataStream(WindowedDataStream<T> windowedDataStream) {
-		this.dataStream = windowedDataStream.dataStream;
-		this.discretizerKey = windowedDataStream.discretizerKey;
-		this.keyByKey = windowedDataStream.keyByKey;
-		this.triggerHelper = windowedDataStream.triggerHelper;
-		this.evictionHelper = windowedDataStream.evictionHelper;
-		this.userTrigger = windowedDataStream.userTrigger;
-		this.userEvicter = windowedDataStream.userEvicter;
-		this.isLocal = windowedDataStream.isLocal;
-	}
-
-	public WindowedDataStream() {
-	}
-
-	/**
-	 * Defines the slide size (trigger frequency) for the windowed data stream.
-	 * This controls how often the user defined function will be triggered on
-	 * the window. </br></br> For example to get a window of 5 elements with a
-	 * slide of 2 seconds use: </br></br>
-	 * {@code ds.window(Count.of(5)).every(Time.of(2,TimeUnit.SECONDS))}
-	 * </br></br> The user function in this case will be called on the 5 most
-	 * recent elements every 2 seconds
-	 * 
-	 * @param policyHelper
-	 *            The policy that define the triggering frequency
-	 * 
-	 * @return The windowed data stream with triggering set
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public WindowedDataStream<T> every(WindowingHelper policyHelper) {
-		policyHelper.setExecutionConfig(getExecutionConfig());
-		WindowedDataStream<T> ret = this.copy();
-		if (ret.evictionHelper == null) {
-			ret.evictionHelper = ret.triggerHelper;
-			ret.triggerHelper = policyHelper;
-		}
-
-		return ret;
-	}
-
-	/**
-	 * Groups the elements of the {@link WindowedDataStream} by the given key
-	 * positions. The window sizes (evictions) and slide sizes (triggers) will
-	 * be calculated on the whole stream (in a global fashion), but the user
-	 * defined functions will be applied on a per group basis. </br></br> To get
-	 * windows and triggers on a per group basis apply the
-	 * {@link DataStream#window} operator on an already grouped data stream.
-	 * 
-	 * @param fields
-	 *            The position of the fields to group by.
-	 * @return The grouped {@link WindowedDataStream}
-	 */
-	public WindowedDataStream<T> keyBy(int... fields) {
-		if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
-			return keyBy(new KeySelectorUtil.ArrayKeySelector<T>(fields));
-		} else {
-			return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
-		}
-	}
-
-	/**
-	 * Groups the elements of the {@link WindowedDataStream} by the given field
-	 * expressions. The window sizes (evictions) and slide sizes (triggers) will
-	 * be calculated on the whole stream (in a global fashion), but the user
-	 * defined functions will be applied on a per group basis. </br></br> To get
-	 * windows and triggers on a per group basis apply the
-	 * {@link DataStream#window} operator on an already grouped data stream.
-	 * </br></br> A field expression is either the name of a public field or a
-	 * getter method with parentheses of the stream's underlying type. A dot can
-	 * be used to drill down into objects, as in
-	 * {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param fields
-	 *            The fields to group by
-	 * @return The grouped {@link WindowedDataStream}
-	 */
-	public WindowedDataStream<T> keyBy(String... fields) {
-		return keyBy(new Keys.ExpressionKeys<T>(fields, getType()));
-	}
-
-	/**
-	 * Groups the elements of the {@link WindowedDataStream} using the given
-	 * {@link KeySelector}. The window sizes (evictions) and slide sizes
-	 * (triggers) will be calculated on the whole stream (in a global fashion),
-	 * but the user defined functions will be applied on a per group basis.
-	 * </br></br> To get windows and triggers on a per group basis apply the
-	 * {@link DataStream#window} operator on an already grouped data stream.
-	 * 
-	 * @param keySelector
-	 *            The keySelector used to extract the key for grouping.
-	 * @return The grouped {@link WindowedDataStream}
-	 */
-	public WindowedDataStream<T> keyBy(KeySelector<T, ?> keySelector) {
-		WindowedDataStream<T> ret = this.copy();
-		ret.keyByKey = keySelector;
-		return ret;
-	}
-
-	private WindowedDataStream<T> keyBy(Keys<T> keys) {
-		return keyBy(clean(KeySelectorUtil.getSelectorForKeys(keys, getType(),
-				getExecutionConfig())));
-	}
-
-	/**
-	 * Sets the window discretisation local, meaning that windows will be
-	 * created in parallel at environment parallelism.
-	 * 
-	 * @return The WindowedDataStream with local discretisation
-	 */
-	public WindowedDataStream<T> local() {
-		WindowedDataStream<T> out = copy();
-		out.isLocal = true;
-		return out;
-	}
-
-	/**
-	 * Returns the {@link DataStream} of {@link StreamWindow}s which represent
-	 * the discretised stream. There is no ordering guarantee for the received
-	 * windows.
-	 * 
-	 * @return The discretised stream
-	 */
-	public DataStream<StreamWindow<T>> getDiscretizedStream() {
-		if (getEviction() instanceof KeepAllEvictionPolicy) {
-			throw new RuntimeException("Cannot get discretized stream for full stream window");
-		}
-		return discretize(WindowTransformation.NONE, new BasicWindowBuffer<T>())
-				.getDiscretizedStream();
-	}
-
-	/**
-	 * Flattens the results of the window computations and streams out the
-	 * window elements.
-	 * 
-	 * @return The data stream consisting of the individual records.
-	 */
-	public DataStream<T> flatten() {
-		return dataStream;
-	}
-
-	/**
-	 * Applies a reduce transformation on the windowed data stream by reducing
-	 * the current window at every trigger.The user can also extend the
-	 * {@link RichReduceFunction} to gain access to other features provided by
-	 * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param reduceFunction
-	 *            The reduce function that will be applied to the windows.
-	 * @return The transformed DataStream
-	 */
-	public DiscretizedStream<T> reduceWindow(ReduceFunction<T> reduceFunction) {
-
-		// We check whether we should apply parallel time discretization, which
-		// is a more complex exploiting the monotonic properties of time
-		// policies
-		if (WindowUtils.isTimeOnly(getTrigger(), getEviction()) && discretizerKey == null
-				&& dataStream.getParallelism() > 1) {
-			return timeReduce(reduceFunction);
-		} else {
-			WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
-					.with(clean(reduceFunction));
-
-			WindowBuffer<T> windowBuffer = getWindowBuffer(transformation);
-
-			DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
-
-			if (windowBuffer instanceof PreAggregator) {
-				return discretized;
-			} else {
-				return discretized.reduceWindow(reduceFunction);
-			}
-		}
-	}
-
-	/**
-	 * Applies a fold transformation on the windowed data stream by folding the
-	 * current window at every trigger.The user can also extend the
-	 * {@link RichFoldFunction} to gain access to other features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * This version of foldWindow uses user supplied typeinformation for
-	 * serializaton. Use this only when the system is unable to detect type
-	 * information.
-	 * 
-	 * @param foldFunction
-	 *            The fold function that will be applied to the windows.
-	 * @param initialValue
-	 *            Initial value given to foldFunction
-	 * @param outType
-	 *            The output type of the operator
-	 * @return The transformed DataStream
-	 */
-	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction,
-			TypeInformation<R> outType) {
-
-		return discretize(WindowTransformation.FOLDWINDOW.with(clean(foldFunction)),
-				new BasicWindowBuffer<T>()).foldWindow(initialValue, foldFunction, outType);
-
-	}
-
-	/**
-	 * Applies a fold transformation on the windowed data stream by folding the
-	 * current window at every trigger.The user can also extend the
-	 * {@link RichFoldFunction} to gain access to other features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param foldFunction
-	 *            The fold function that will be applied to the windows.
-	 * @param initialValue
-	 *            Initial value given to foldFunction
-	 * @return The transformed DataStream
-	 */
-	public <R> DiscretizedStream<R> foldWindow(R initialValue, FoldFunction<T, R> foldFunction) {
-
-		TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(foldFunction),
-				getType());
-		return foldWindow(initialValue, foldFunction, outType);
-	}
-
-	/**
-	 * Applies a mapWindow transformation on the windowed data stream by calling
-	 * the mapWindow function on the window at every trigger. In contrast with
-	 * the standard binary reducer, with mapWindow allows the user to access all
-	 * elements of the window at the same time through the iterable interface.
-	 * The user can also extend the {@link RichWindowMapFunction} to gain access
-	 * to other features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param windowMapFunction
-	 *            The function that will be applied to the windows.
-	 * @return The transformed DataStream
-	 */
-	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction) {
-		return discretize(WindowTransformation.MAPWINDOW.with(clean(windowMapFunction)),
-				getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction);
-	}
-
-	/**
-	 * Applies a mapWindow transformation on the windowed data stream by calling
-	 * the mapWindow function on the window at every trigger. In contrast with
-	 * the standard binary reducer, with mapWindow allows the user to access all
-	 * elements of the window at the same time through the iterable interface.
-	 * The user can also extend the {@link RichWindowMapFunction} to gain access
-	 * to other features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * </br> </br> This version of mapWindow uses user supplied typeinformation
-	 * for serializaton. Use this only when the system is unable to detect type
-	 * information.
-	 * 
-	 * @param windowMapFunction
-	 *            The function that will be applied to the windows.
-	 * @param outType
-	 *            The output type of the operator.
-	 * @return The transformed DataStream
-	 */
-	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<T, R> windowMapFunction,
-			TypeInformation<R> outType) {
-
-		return discretize(WindowTransformation.MAPWINDOW.with(windowMapFunction),
-				getWindowBuffer(WindowTransformation.MAPWINDOW)).mapWindow(windowMapFunction,
-				outType);
-	}
-
-	private DiscretizedStream<T> discretize(WindowTransformation transformation,
-			WindowBuffer<T> windowBuffer) {
-
-		OneInputStreamOperator<T, WindowEvent<T>> discretizer = getDiscretizer();
-
-		OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> bufferOperator = getBufferOperator(windowBuffer);
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		TypeInformation<WindowEvent<T>> bufferEventType = new TupleTypeInfo(WindowEvent.class,
-				getType(), BasicTypeInfo.INT_TYPE_INFO);
-
-		int parallelism = getDiscretizerParallelism(transformation);
-
-		return new DiscretizedStream<T>(dataStream
-				.transform(discretizer.getClass().getSimpleName(), bufferEventType, discretizer)
-				.setParallelism(parallelism)
-				.transform(windowBuffer.getClass().getSimpleName(),
-						new StreamWindowTypeInfo<T>(getType()), bufferOperator)
-				.setParallelism(parallelism), keyByKey, transformation, false);
-
-	}
-
-	/**
-	 * Returns the parallelism for the stream discretizer. The returned
-	 * parallelism is either 1 for for non-parallel global policies (or when the
-	 * input stream is non-parallel), environment parallelism for the policies
-	 * that can run in parallel (such as, any ditributed policy, reduce by count
-	 * or time).
-	 * 
-	 * @param transformation
-	 *            The applied transformation
-	 * @return The parallelism for the stream discretizer
-	 */
-	private int getDiscretizerParallelism(WindowTransformation transformation) {
-		return isLocal
-				|| (transformation == WindowTransformation.REDUCEWINDOW && WindowUtils
-						.isParallelPolicy(getTrigger(), getEviction(), dataStream.getParallelism()))
-				|| (discretizerKey != null) ? dataStream.environment.getParallelism() : 1;
-
-	}
-
-	/**
-	 * Dedicated method for applying parallel time reduce transformations on
-	 * windows
-	 * 
-	 * @param reduceFunction
-	 *            Reduce function to apply
-	 * @return The transformed stream
-	 */
-	protected DiscretizedStream<T> timeReduce(ReduceFunction<T> reduceFunction) {
-
-		WindowTransformation transformation = WindowTransformation.REDUCEWINDOW
-				.with(clean(reduceFunction));
-
-		// We get the windowbuffer and set it to emit empty windows with
-		// sequential IDs. This logic is necessary to merge windows created in
-		// parallel.
-		WindowBuffer<T> windowBuffer = getWindowBuffer(transformation).emitEmpty().sequentialID();
-
-		// If there is a groupby for the reduce operation we apply it before the
-		// discretizers, because we will forward everything afterwards to
-		// exploit task chaining
-		if (keyByKey != null) {
-			dataStream = dataStream.keyBy(keyByKey);
-		}
-
-		// We discretize the stream and call the timeReduce function of the
-		// discretized stream, we also pass the type of the windowbuffer
-		DiscretizedStream<T> discretized = discretize(transformation, windowBuffer);
-
-		if (getEviction() instanceof KeepAllEvictionPolicy
-				&& !(windowBuffer instanceof PreAggregator)) {
-			throw new RuntimeException(
-					"Error in preaggregator logic, parallel time reduce should always be preaggregated");
-		}
-
-		return discretized.timeReduce(reduceFunction);
-
-	}
-
-	/**
-	 * Based on the defined policies, returns the stream discretizer to be used
-	 */
-	private OneInputStreamOperator<T, WindowEvent<T>> getDiscretizer() {
-		if (discretizerKey == null) {
-			return new StreamDiscretizer<T>(getTrigger(), getEviction());
-		} else if (getTrigger() instanceof CentralActiveTrigger) {
-			return new GroupedActiveDiscretizer<T>(discretizerKey,
-					(CentralActiveTrigger<T>) getTrigger(),
-					(CloneableEvictionPolicy<T>) getEviction());
-		} else {
-			return new GroupedStreamDiscretizer<T>(discretizerKey,
-					(CloneableTriggerPolicy<T>) getTrigger(),
-					(CloneableEvictionPolicy<T>) getEviction());
-		}
-
-	}
-
-	private OneInputStreamOperator<WindowEvent<T>, StreamWindow<T>> getBufferOperator(
-			WindowBuffer<T> windowBuffer) {
-		if (discretizerKey == null) {
-			return new StreamWindowBuffer<T>(windowBuffer);
-		} else {
-			return new GroupedWindowBuffer<T>(windowBuffer, discretizerKey);
-		}
-	}
-
-	/**
-	 * Based on the given policies returns the WindowBuffer used to store the
-	 * elements in the window. This is the module that also encapsulates the
-	 * pre-aggregator logic when it is applicable, reducing the space cost, and
-	 * trigger latency.
-	 * 
-	 */
-	@SuppressWarnings("unchecked")
-	private WindowBuffer<T> getWindowBuffer(WindowTransformation transformation) {
-		TriggerPolicy<T> trigger = getTrigger();
-		EvictionPolicy<T> eviction = getEviction();
-
-		if (transformation == WindowTransformation.REDUCEWINDOW) {
-			if (WindowUtils.isTumblingPolicy(trigger, eviction)) {
-				if (eviction instanceof KeepAllEvictionPolicy) {
-					if (keyByKey == null) {
-						return new TumblingPreReducer<T>(
-								(ReduceFunction<T>) transformation.getUDF(), getType()
-										.createSerializer(getExecutionConfig())).noEvict();
-					} else {
-						return new TumblingGroupedPreReducer<T>(
-								(ReduceFunction<T>) transformation.getUDF(), keyByKey,
-								getType().createSerializer(getExecutionConfig())).noEvict();
-					}
-				} else {
-					if (keyByKey == null) {
-						return new TumblingPreReducer<T>(
-								(ReduceFunction<T>) transformation.getUDF(), getType()
-										.createSerializer(getExecutionConfig()));
-					} else {
-						return new TumblingGroupedPreReducer<T>(
-								(ReduceFunction<T>) transformation.getUDF(), keyByKey,
-								getType().createSerializer(getExecutionConfig()));
-					}
-				}
-			} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
-				if (keyByKey == null) {
-					return new SlidingCountPreReducer<T>(
-							clean((ReduceFunction<T>) transformation.getUDF()), dataStream
-									.getType().createSerializer(getExecutionConfig()),
-							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-							((CountTriggerPolicy<?>) trigger).getStart());
-				} else {
-					return new SlidingCountGroupedPreReducer<T>(
-							clean((ReduceFunction<T>) transformation.getUDF()), dataStream
-									.getType().createSerializer(getExecutionConfig()), keyByKey,
-							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-							((CountTriggerPolicy<?>) trigger).getStart());
-				}
-
-			} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
-				if (keyByKey == null) {
-					return new SlidingTimePreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
-									.createSerializer(getExecutionConfig()),
-							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-							WindowUtils.getTimeStampWrapper(trigger));
-				} else {
-					return new SlidingTimeGroupedPreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), dataStream.getType()
-									.createSerializer(getExecutionConfig()), keyByKey,
-							WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
-							WindowUtils.getTimeStampWrapper(trigger));
-				}
-
-			} else if (WindowUtils.isJumpingCountPolicy(trigger, eviction)) {
-				if (keyByKey == null) {
-					return new JumpingCountPreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), getType()
-									.createSerializer(getExecutionConfig()),
-							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
-				} else {
-					return new JumpingCountGroupedPreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
-									.createSerializer(getExecutionConfig()),
-							WindowUtils.getSlideSize(trigger) - WindowUtils.getWindowSize(eviction));
-				}
-			} else if (WindowUtils.isJumpingTimePolicy(trigger, eviction)) {
-				if (keyByKey == null) {
-					return new JumpingTimePreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), getType()
-									.createSerializer(getExecutionConfig()),
-							WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
-							WindowUtils.getTimeStampWrapper(trigger));
-				} else {
-					return new JumpingTimeGroupedPreReducer<T>(
-							(ReduceFunction<T>) transformation.getUDF(), keyByKey, getType()
-									.createSerializer(getExecutionConfig()),
-							WindowUtils.getSlideSize(trigger), WindowUtils.getWindowSize(eviction),
-							WindowUtils.getTimeStampWrapper(trigger));
-				}
-			}
-		}
-
-		if (eviction instanceof KeepAllEvictionPolicy) {
-			throw new RuntimeException(
-					"Full stream policy can only be used with operations that support preaggregations, such as reduce or aggregations");
-		} else {
-			return new BasicWindowBuffer<T>();
-		}
-	}
-
-	/**
-	 * Applies an aggregation that sums every window of the data stream at the
-	 * given position.
-	 * 
-	 * @param positionToSum
-	 *            The position in the tuple/array to sum
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> sum(int positionToSum) {
-		return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that sums every window of the pojo data stream at
-	 * the given field for every window. </br></br> A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * stream's underlying type. A dot can be used to drill down into objects,
-	 * as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field to sum
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> sum(String field) {
-		return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum value of every window
-	 * of the data stream at the given position.
-	 * 
-	 * @param positionToMin
-	 *            The position to minimize
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> min(int positionToMin) {
-		return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum value of the pojo data
-	 * stream at the given field expression for every window. </br></br>A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> min(String field) {
-		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
-				false, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns the first element by default.
-	 * 
-	 * @param positionToMinBy
-	 *            The position to minimize by
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> minBy(int positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns the first element by default.
-	 * 
-	 * @param positionToMinBy
-	 *            The position to minimize by
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> minBy(String positionToMinBy) {
-		return this.minBy(positionToMinBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the minimum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * minimum value the operator returns either the first or last one depending
-	 * on the parameter setting.
-	 * 
-	 * @param positionToMinBy
-	 *            The position to minimize
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            minimum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> minBy(int positionToMinBy, boolean first) {
-		return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum element of the pojo
-	 * data stream by the given field expression for every window. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> minBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MINBY,
-				first, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum value of every window of
-	 * the data stream at the given position.
-	 * 
-	 * @param positionToMax
-	 *            The position to maximize
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> max(int positionToMax) {
-		return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum value of the pojo data
-	 * stream at the given field expression for every window. A field expression
-	 * is either the name of a public field or a getter method with parentheses
-	 * of the {@link DataStream}S underlying type. A dot can be used to drill
-	 * down into objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> max(String field) {
-		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
-				false, getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns the first by default.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position to maximize by
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> maxBy(int positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns the first by default.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position to maximize by
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> maxBy(String positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every window of
-	 * the data stream by the given position. If more elements have the same
-	 * maximum value the operator returns either the first or last one depending
-	 * on the parameter setting.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position to maximize by
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> maxBy(int positionToMaxBy, boolean first) {
-		return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
-				getExecutionConfig()));
-	}
-
-	/**
-	 * Applies an aggregation that that gives the maximum element of the pojo
-	 * data stream by the given field expression for every window. A field
-	 * expression is either the name of a public field or a getter method with
-	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
-	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
-	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
-	 * @return The transformed DataStream.
-	 */
-	public WindowedDataStream<T> maxBy(String field, boolean first) {
-		return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY, first,
-				getExecutionConfig()));
-	}
-
-	private WindowedDataStream<T> aggregate(AggregationFunction<T> aggregator) {
-		return reduceWindow(aggregator);
-	}
-
-	protected TriggerPolicy<T> getTrigger() {
-
-		if (triggerHelper != null) {
-			return triggerHelper.toTrigger();
-		} else if (userTrigger != null) {
-			return userTrigger;
-		} else {
-			throw new RuntimeException("Trigger must not be null");
-		}
-
-	}
-
-	protected EvictionPolicy<T> getEviction() {
-
-		if (evictionHelper != null) {
-			return evictionHelper.toEvict();
-		} else if (userEvicter == null || userEvicter instanceof TumblingEvictionPolicy) {
-			if (triggerHelper instanceof Time) {
-				return triggerHelper.toEvict();
-			} else {
-				return new TumblingEvictionPolicy<T>();
-			}
-		} else {
-			return userEvicter;
-		}
-
-	}
-
-	public <F> F clean(F f) {
-		if (getExecutionConfig().isClosureCleanerEnabled()) {
-			ClosureCleaner.clean(f, true);
-		}
-		ClosureCleaner.ensureSerializable(f);
-		return f;
-	}
-
-	protected boolean isGrouped() {
-		return keyByKey != null;
-	}
-
-	/**
-	 * Gets the output type.
-	 * 
-	 * @return The output type.
-	 */
-	public TypeInformation<T> getType() {
-		return dataStream.getType();
-	}
-
-	public ExecutionConfig getExecutionConfig() {
-		return dataStream.getExecutionConfig();
-	}
-
-	protected WindowedDataStream<T> copy() {
-		return new WindowedDataStream<T>(this);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
deleted file mode 100644
index ff045a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/RichWindowMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Abstract class for defining rich mapWindow transformation to be applied on
- * {@link WindowedDataStream}s. The mapWindow function will be called on each
- * {@link StreamWindow}.</p> In addition the user can access the functionality
- * provided by the {@link RichFunction} interface.
- */
-public abstract class RichWindowMapFunction<IN, OUT> extends AbstractRichFunction implements
-		WindowMapFunction<IN, OUT> {
-
-	private static final long serialVersionUID = 9052714915997374185L;
-
-	@Override
-	public abstract void mapWindow(Iterable<IN> values, Collector<OUT> out) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
deleted file mode 100644
index ececb29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/WindowMapFunction.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.util.Collector;
-
-/**
- * Interface for defining mapWindow transformation to be applied on
- * {@link WindowedDataStream}s. The mapWindow function will be called on each
- * {@link StreamWindow}.
- */
-public interface WindowMapFunction<T, O> extends Function, Serializable {
-
-	void mapWindow(Iterable<T> values, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
index 7859b2c..86a12e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.functions.windowing.delta;
 
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
 
 /**
  * This delta function calculates the cosine distance between two given vectors.

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
index f9e8ec7..23efbf2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.functions.windowing.delta;
 
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
 
 /**
  * This delta function calculates the euclidean distance between two given

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
index bd5b0b9..7a4e01a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.functions.windowing.delta;
 
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+import org.apache.flink.streaming.api.functions.windowing.delta.extractor.Extractor;
 
 /**
  * Extend this abstract class to implement a delta function which is aware of

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
new file mode 100644
index 0000000..baceba4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ArrayFromTuple.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Converts a Tuple to an Object-Array. The field which should be included in
+ * the array can selected and reordered as needed.
+ */
+public class ArrayFromTuple implements Extractor<Tuple, Object[]> {
+
+	/**
+	 * Auto generated version id
+	 */
+	private static final long serialVersionUID = -6076121226427616818L;
+	int[] order = null;
+
+	/**
+	 * Using this constructor the extractor will convert the whole tuple (all
+	 * fields in the original order) to an array.
+	 */
+	public ArrayFromTuple() {
+		// noting to do
+	}
+
+	/**
+	 * Using this constructor the extractor will combine the fields as specified
+	 * in the indexes parameter in an object array.
+	 * 
+	 * @param indexes
+	 *            the field ids (enumerated from 0)
+	 */
+	public ArrayFromTuple(int... indexes) {
+		this.order = indexes;
+	}
+
+	@Override
+	public Object[] extract(Tuple in) {
+		Object[] output;
+
+		if (order == null) {
+			// copy the whole tuple
+			output = new Object[in.getArity()];
+			for (int i = 0; i < in.getArity(); i++) {
+				output[i] = in.getField(i);
+			}
+		} else {
+			// copy user specified order
+			output = new Object[order.length];
+			for (int i = 0; i < order.length; i++) {
+				output[i] = in.getField(order[i]);
+			}
+		}
+
+		return output;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
new file mode 100644
index 0000000..89c3a32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/ConcatenatedExtract.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+/**
+ * Combines two extractors which will be executed one after each other.
+ *
+ * @param <FROM>
+ *            The input type of the first extractor.
+ * @param <OVER>
+ *            The output type of the first and the input type of the second
+ *            extractor.
+ * @param <TO>
+ *            The output type of the second extractor and the output type of the
+ *            over all extraction.
+ */
+public class ConcatenatedExtract<FROM, OVER, TO> implements Extractor<FROM, TO> {
+
+	/**
+	 * auto-generated id
+	 */
+	private static final long serialVersionUID = -7807197760725651752L;
+
+	private Extractor<FROM, OVER> e1;
+	private Extractor<OVER, TO> e2;
+
+	/**
+	 * Combines two extractors which will be executed one after each other.
+	 * 
+	 * @param e1
+	 *            First extractor: This extractor gets applied to the input data
+	 *            first. Its output as then passed as input to the second
+	 *            extractor.
+	 * @param e2
+	 *            Second extractor: This extractor gets the output of the first
+	 *            extractor as input. Its output is then the result of the over
+	 *            all extraction.
+	 */
+	public ConcatenatedExtract(Extractor<FROM, OVER> e1, Extractor<OVER, TO> e2) {
+		this.e1 = e1;
+		this.e2 = e2;
+	}
+
+	@Override
+	public TO extract(FROM in) {
+		return e2.extract(e1.extract(in));
+	}
+
+	public <OUT> ConcatenatedExtract<FROM, TO, OUT> add(Extractor<TO, OUT> e3) {
+		return new ConcatenatedExtract<FROM, TO, OUT>(this, e3);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
new file mode 100644
index 0000000..8cd0014
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/Extractor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import java.io.Serializable;
+
+/**
+ * Extractors allow to extract/convert one type to another. They are mostly used
+ * to extract some fields out of a more complex structure (Tuple/Array) to run
+ * further calculation on the extraction result.
+ * 
+ * @param <FROM>
+ *            The input data type.
+ * @param <TO>
+ *            The output data type.
+ */
+public interface Extractor<FROM, TO> extends Serializable {
+
+	/**
+	 * Extracts/Converts the given input to an object of the output type
+	 * 
+	 * @param in
+	 *            the input data
+	 * @return the extracted/converted data
+	 */
+	public TO extract(FROM in);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
new file mode 100644
index 0000000..f9d0a2b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromArray.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import java.lang.reflect.Array;
+
+/**
+ * Extracts a single field out of an array.
+ * 
+ * @param <OUT>
+ *            The type of the extracted field.
+ */
+public class FieldFromArray<OUT> implements Extractor<Object, OUT> {
+
+	/**
+	 * Auto-gernated version id
+	 */
+	private static final long serialVersionUID = -5161386546695574359L;
+	private int fieldId = 0;
+
+	/**
+	 * Extracts the first field (id 0) from the array
+	 */
+	public FieldFromArray() {
+		// noting to do => will use default 0
+	}
+
+	/**
+	 * Extracts the field with the given id from the array.
+	 * 
+	 * @param fieldId
+	 *            The id of the field which will be extracted from the array.
+	 */
+	public FieldFromArray(int fieldId) {
+		this.fieldId = fieldId;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public OUT extract(Object in) {
+		return (OUT) Array.get(in, fieldId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
new file mode 100644
index 0000000..627afca
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/extractor/FieldFromTuple.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta.extractor;
+
+import org.apache.flink.api.java.tuple.Tuple;
+
+/**
+ * Extracts a single field out of a tuple.
+ * 
+ * @param <OUT>
+ *            The type of the extracted field.
+ */
+public class FieldFromTuple<OUT> implements Extractor<Tuple, OUT> {
+
+	/**
+	 * Auto-gernated version id
+	 */
+	private static final long serialVersionUID = -5161386546695574359L;
+	private int fieldId = 0;
+
+	/**
+	 * Extracts the first field (id 0) from the tuple
+	 */
+	public FieldFromTuple() {
+		// noting to do => will use default 0
+	}
+
+	/**
+	 * Extracts the field with the given id from the tuple.
+	 * 
+	 * @param fieldId
+	 *            The id of the field which will be extracted from the tuple.
+	 */
+	public FieldFromTuple(int fieldId) {
+		this.fieldId = fieldId;
+	}
+
+	@Override
+	public OUT extract(Tuple in) {
+		return in.getField(fieldId);
+	}
+
+}